Skip to content

Commit

Permalink
Merge pull request #299 from llIlll/fix-consume
Browse files Browse the repository at this point in the history
修复并行消费
  • Loading branch information
llIlll committed Sep 3, 2020
2 parents 4af790a + 439d035 commit f533a59
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ public static TransportConfig convertTransportConfig(KeyValue attributes) {
transportConfig.setHeartbeatInterval(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.HEARTBEAT_INTERVAL, transportConfig.getHeartbeatInterval()));
transportConfig.setHeartbeatTimeout(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.HEARTBEAT_TIMEOUT, transportConfig.getHeartbeatTimeout()));
transportConfig.setSoLinger(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.SO_LINGER, transportConfig.getSoLinger()));
transportConfig.setTcpNoDelay(attributes.getBoolean(JoyQueueTransportBuiltinKeys.CONNECTIONS, transportConfig.isTcpNoDelay()));
transportConfig.setTcpNoDelay(attributes.getBoolean(JoyQueueTransportBuiltinKeys.TCP_NO_DELAY, transportConfig.isTcpNoDelay()));
transportConfig.setKeepAlive(attributes.getBoolean(JoyQueueTransportBuiltinKeys.KEEPALIVE, transportConfig.isKeepAlive()));
transportConfig.setSoTimeout(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.SO_TIMEOUT, transportConfig.getSoTimeout()));
transportConfig.setSendTimeout(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.SEND_TIMEOUT, transportConfig.getSoTimeout()));
transportConfig.setSocketBufferSize(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.SOCKET_BUFFER_SIZE, transportConfig.getSocketBufferSize()));
transportConfig.setMaxOneway(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.MAX_ONEWAY, transportConfig.getMaxOneway()));
transportConfig.setMaxAsync(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.MAX_ASYNC, transportConfig.getMaxAsync()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.joyqueue.client.internal.consumer.support;

import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.client.internal.cluster.ClusterClientManager;
import org.joyqueue.client.internal.cluster.ClusterManager;
import org.joyqueue.client.internal.consumer.BaseMessageListener;
Expand All @@ -27,7 +26,6 @@
import org.joyqueue.client.internal.consumer.interceptor.ConsumerInterceptor;
import org.joyqueue.client.internal.consumer.interceptor.ConsumerInterceptorManager;
import org.joyqueue.client.internal.consumer.transport.ConsumerClientManager;
import org.joyqueue.client.internal.metadata.domain.PartitionGroupMetadata;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.client.internal.nameserver.helper.NameServerHelper;
Expand Down Expand Up @@ -133,13 +131,7 @@ protected MessagePoller createMessagePoller(String topic) {
}

if (config.getThread() == ConsumerConfig.NONE_THREAD) {
int maxPartition = 1;
if (CollectionUtils.isNotEmpty(topicMetadata.getPartitionGroups())) {
for (PartitionGroupMetadata partitionGroup : topicMetadata.getPartitionGroups()) {
maxPartition = Math.max(partitionGroup.getPartitions().size(), maxPartition);
}
}
config.setThread(maxPartition);
config.setThread(topicMetadata.getPartitions().size());
}

if (topicMetadata.getType().equals(TopicType.BROADCAST)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Map;

/**
Expand Down Expand Up @@ -104,6 +105,9 @@ protected void doStop() {
* @return 未归档的发送日志条数
*/
public long getSendBacklogNum() {
if (sendArchiveService == null) {
return 0;
}
return sendArchiveService.remainMessagesSum();
}

Expand All @@ -113,6 +117,9 @@ public long getSendBacklogNum() {
* @return 剩余未归档消费日志的大小(文件数量 * 文件大小)
*/
public long getConsumeBacklogNum() {
if (consumeArchiveService == null) {
return 0;
}
return consumeArchiveService.getRemainConsumeLogFileNum();
}

Expand All @@ -122,6 +129,9 @@ public long getConsumeBacklogNum() {
* @return 未归档的发送日志条数
*/
public Map<String, Long> getSendBacklogNumByTopic() {
if (sendArchiveService == null) {
return Collections.emptyMap();
}
return sendArchiveService.getArchivePosition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ protected void validate() throws Exception {
this.concurrentConsumption =
consumeConfig.useLegacyConcurrentConsumer() ?
new ConcurrentConsumption(clusterManager, storeService, partitionManager, messageRetry, positionManager, filterMessageSupport, archiveManager, sessionManager):
new SlideWindowConcurrentConsumer(clusterManager, storeService, partitionManager, messageRetry, positionManager, filterMessageSupport, archiveManager, consumeConfig)
new SlideWindowConcurrentConsumer(clusterManager, storeService, partitionManager, messageRetry, positionManager,
filterMessageSupport, archiveManager, consumeConfig, brokerContext.getEventBus());
;
this.resetBroadcastIndexTimer = new Timer("joyqueuue-consume-reset-broadcast-index-timer");
}
Expand Down
Loading

0 comments on commit f533a59

Please sign in to comment.