Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/chubaostream/joyqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
iamazy committed Aug 25, 2020
2 parents 794b624 + 92f5f4a commit 0d8befd
Show file tree
Hide file tree
Showing 154 changed files with 3,784 additions and 1,372 deletions.
4 changes: 4 additions & 0 deletions joyqueue-client/joyqueue-client-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-network</artifactId>
</dependency>
<dependency>
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-client-loadbalance-adaptive</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
*/
package io.openmessaging.joyqueue.config;

import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.client.internal.producer.config.ProducerConfig;
import org.joyqueue.client.internal.producer.feedback.config.TxFeedbackConfig;
import org.joyqueue.client.internal.transport.config.TransportConfig;
import org.joyqueue.domain.QosLevel;
import io.openmessaging.KeyValue;
import io.openmessaging.joyqueue.domain.JoyQueueConsumerBuiltinKeys;
import io.openmessaging.joyqueue.domain.JoyQueueNameServerBuiltinKeys;
import io.openmessaging.joyqueue.domain.JoyQueueProducerBuiltinKeys;
import io.openmessaging.joyqueue.domain.JoyQueueTransportBuiltinKeys;
import io.openmessaging.joyqueue.domain.JoyQueueTxFeedbackBuiltinKeys;
import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.client.internal.producer.config.ProducerConfig;
import org.joyqueue.client.internal.producer.feedback.config.TxFeedbackConfig;
import org.joyqueue.client.internal.transport.config.TransportConfig;
import org.joyqueue.domain.QosLevel;

/**
* KeyValueConverter
Expand Down Expand Up @@ -109,6 +109,7 @@ public static ConsumerConfig convertConsumerConfig(KeyValue attributes) {
consumerConfig.setSessionTimeout(attributes.getLong(JoyQueueConsumerBuiltinKeys.SESSION_TIMEOUT, consumerConfig.getSessionTimeout()));
consumerConfig.setThread(KeyValueHelper.getInt(attributes, JoyQueueConsumerBuiltinKeys.THREAD, consumerConfig.getThread()));
consumerConfig.setFailover(attributes.getBoolean(JoyQueueConsumerBuiltinKeys.FAILOVER, consumerConfig.isFailover()));
consumerConfig.setForceAck(attributes.getBoolean(JoyQueueConsumerBuiltinKeys.FORCE_ACK, consumerConfig.isForceAck()));
consumerConfig.setLoadBalance(attributes.getBoolean(JoyQueueConsumerBuiltinKeys.LOADBALANCE, consumerConfig.isLoadBalance()));
consumerConfig.setLoadBalanceType(KeyValueHelper.getString(attributes, JoyQueueConsumerBuiltinKeys.LOADBALANCE_TYPE, consumerConfig.getLoadBalanceType()));
consumerConfig.setBroadcastGroup(KeyValueHelper.getString(attributes, JoyQueueConsumerBuiltinKeys.BROADCAST_GROUP, consumerConfig.getBroadcastGroup()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public interface JoyQueueConsumerBuiltinKeys extends OMSBuiltinKeys {

String FAILOVER = "CONSUMER_FAILOVER";

String FORCE_ACK = "CONSUMER_FORCE_ACK";

String LOADBALANCE = "CONSUMER_LOADBALANCE";

String LOADBALANCE_TYPE = "CONSUMER_LOADBALANCE_TYPE";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.domain.BrokerNode;
import org.joyqueue.toolkit.time.SystemClock;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
Expand All @@ -51,22 +51,12 @@ public TopicMetadataHolder getTopicMetadata(String topic, String app) {
}

public void putTopicMetadata(String topic, String app, TopicMetadata topicMetadata) {
getTopicMap(app).put(topic, newTopicMetadataHolder(topic, topicMetadata));
}

public void putTopicMetadata(Map<String, TopicMetadata> topicMetadata, String app) {
ConcurrentMap<String, TopicMetadataHolder> topicMap = getTopicMap(app);
for (Map.Entry<String, TopicMetadata> entry : topicMetadata.entrySet()) {
topicMap.put(entry.getKey(), newTopicMetadataHolder(entry.getKey(), entry.getValue()));
}
}

public void setTopicMetadata(Map<String, TopicMetadata> topicMetadata, String app) {
ConcurrentMap<String, TopicMetadataHolder> newTopicMap = Maps.newConcurrentMap();
for (Map.Entry<String, TopicMetadata> entry : topicMetadata.entrySet()) {
newTopicMap.put(entry.getKey(), newTopicMetadataHolder(entry.getKey(), entry.getValue()));
TopicMetadataHolder oldTopicMetadataHolder = getTopicMap(app).get(topic);
TopicMetadata oldTopicMetadata = null;
if (oldTopicMetadataHolder != null) {
oldTopicMetadata = oldTopicMetadataHolder.getTopicMetadata();
}
topicMetadataCache.put(app, newTopicMap);
getTopicMap(app).put(topic, newTopicMetadataHolder(topic, topicMetadata, oldTopicMetadata));
}

protected ConcurrentMap<String, TopicMetadataHolder> getTopicMap(String app) {
Expand All @@ -81,8 +71,19 @@ protected ConcurrentMap<String, TopicMetadataHolder> getTopicMap(String app) {
return topicMap;
}

protected TopicMetadataHolder newTopicMetadataHolder(String topic, TopicMetadata topicMetadata) {
protected TopicMetadataHolder newTopicMetadataHolder(String topic, TopicMetadata topicMetadata, TopicMetadata oldTopicMetadata) {
if (topicMetadata.getCode().equals(JoyQueueCode.SUCCESS)) {

if (oldTopicMetadata != null) {
topicMetadata.setAttachments(oldTopicMetadata.getAttachments());
for (BrokerNode broker : topicMetadata.getBrokers()) {
BrokerNode oldBrokerNode = oldTopicMetadata.getBroker(broker.getId());
if (oldBrokerNode != null) {
broker.setAttachments(oldBrokerNode.getAttachments());
}
}
}

if (topicMetadata.isAllAvailable()) {
return new TopicMetadataHolder(topic, topicMetadata, SystemClock.now(), config.getUpdateMetadataInterval(), topicMetadata.getCode());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ConsumerConfig {
private long sessionTimeout = 1000 * 60 * 1;
private int thread = NONE_THREAD;
private boolean failover = true;
private boolean forceAck = false;

private boolean loadBalance = true;
private String loadBalanceType = RoundRobinBrokerLoadBalance.NAME;
Expand All @@ -72,6 +73,7 @@ public ConsumerConfig copy() {
consumerConfig.setSessionTimeout(sessionTimeout);
consumerConfig.setThread(thread);
consumerConfig.setFailover(failover);
consumerConfig.setForceAck(forceAck);
consumerConfig.setLoadBalance(loadBalance);
consumerConfig.setLoadBalanceType(loadBalanceType);
consumerConfig.setBroadcastGroup(broadcastGroup);
Expand Down Expand Up @@ -193,6 +195,14 @@ public boolean isFailover() {
return failover;
}

public void setForceAck(boolean forceAck) {
this.forceAck = forceAck;
}

public boolean isForceAck() {
return forceAck;
}

public void setLoadBalance(boolean loadBalance) {
this.loadBalance = loadBalance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.joyqueue.client.internal.consumer.support;

import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.client.internal.consumer.BatchMessageListener;
import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
import org.joyqueue.client.internal.consumer.converter.ConsumeMessageConverter;
Expand All @@ -27,7 +28,6 @@
import org.joyqueue.domain.ConsumerPolicy;
import org.joyqueue.network.command.RetryType;
import org.joyqueue.toolkit.time.SystemClock;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -82,7 +82,11 @@ public List<ConsumeReply> invoke(ConsumeContext context) {
if (logger.isDebugEnabled()) {
logger.debug("execute batchMessageListener, ignore ack, topic: {}, messages: {}, listeners: {}", topicMetadata.getTopic(), messages, listeners);
}
retryType = RetryType.OTHER;
if (config.isForceAck()) {
retryType = RetryType.OTHER;
} else {
retryType = RetryType.NONE;
}
} else {
logger.error("execute batchMessageListener exception, topic: {}, messages: {}, listeners: {}", topicMetadata.getTopic(), messages, listeners, e);
retryType = RetryType.EXCEPTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,14 @@ public List<ConsumeReply> invoke(ConsumeContext context) {
}
} catch (Exception e) {
if (e instanceof IgnoreAckException) {
logger.debug("execute messageListener, ignore ack, topic: {}, message: {}, listeners: {}", topicMetadata.getTopic(), message, listeners);
retryType = RetryType.OTHER;
if (logger.isDebugEnabled()) {
logger.debug("execute messageListener, ignore ack, topic: {}, message: {}, listeners: {}", topicMetadata.getTopic(), message, listeners);
}
if (config.isForceAck()) {
retryType = RetryType.OTHER;
} else {
retryType = RetryType.NONE;
}
} else {
logger.error("execute messageListener exception, topic: {}, message: {}, listeners: {}", topicMetadata.getTopic(), message, listeners, e);
retryType = RetryType.EXCEPTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.client.internal.metadata.domain.ClusterMetadata;
import org.joyqueue.client.internal.metadata.domain.PartitionGroupMetadata;
import org.joyqueue.client.internal.metadata.domain.PartitionMetadata;
Expand All @@ -27,11 +30,10 @@
import org.joyqueue.network.command.TopicPartition;
import org.joyqueue.network.command.TopicPartitionGroup;
import org.joyqueue.network.domain.BrokerNode;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* ClusterMetadataConverter
Expand Down Expand Up @@ -66,6 +68,10 @@ public static TopicMetadata convertTopicMetadata(String code, Topic topic, Map<I
Map<Integer, PartitionGroupMetadata> partitionGroupMap = Maps.newHashMap();
List<BrokerNode> brokers = Lists.newArrayList();
List<BrokerNode> nearbyBrokers = Lists.newArrayList();
Set<BrokerNode> writableBrokers = Sets.newHashSet();
Set<BrokerNode> readableBrokers = Sets.newHashSet();
Set<BrokerNode> writableNearbyBrokers = Sets.newHashSet();
Set<BrokerNode> readableNearbyBrokers = Sets.newHashSet();
Map<Integer, List<PartitionMetadata>> brokerPartitions = Maps.newHashMap();
Map<Integer, List<PartitionGroupMetadata>> brokerPartitionGroups = Maps.newHashMap();
boolean allAvailable = true;
Expand All @@ -83,19 +89,31 @@ public static TopicMetadata convertTopicMetadata(String code, Topic topic, Map<I
partitionGroups.add(partitionGroupMetadata);
partitionGroupMap.put(entry.getKey(), partitionGroupMetadata);

if (partitionGroupMetadata.getLeader() == null) {
BrokerNode leader = partitionGroupMetadata.getLeader();

if (leader == null) {
allAvailable = false;
} else {
if (!partitionGroupMetadata.getLeader().isWritable() || !partitionGroupMetadata.getLeader().isReadable()) {
allAvailable = false;
}

List<PartitionGroupMetadata> brokerPartitionGroupList = brokerPartitionGroups.get(partitionGroupMetadata.getLeader().getId());
List<PartitionGroupMetadata> brokerPartitionGroupList = brokerPartitionGroups.get(leader.getId());
if (brokerPartitionGroupList == null) {
brokerPartitionGroupList = Lists.newArrayList();
brokerPartitionGroups.put(partitionGroupMetadata.getLeader().getId(), brokerPartitionGroupList);
brokerPartitionGroups.put(leader.getId(), brokerPartitionGroupList);
}
brokerPartitionGroupList.add(partitionGroupMetadata);

if (leader.isWritable()) {
writableBrokers.add(leader);
if (leader.isNearby()) {
writableNearbyBrokers.add(leader);
}
}

if (leader.isReadable()) {
readableBrokers.add(leader);
if (leader.isNearby()) {
readableNearbyBrokers.add(leader);
}
}
}

for (Map.Entry<Short, PartitionMetadata> partitionEntry : partitionGroupMetadata.getPartitions().entrySet()) {
Expand Down Expand Up @@ -130,7 +148,8 @@ public static TopicMetadata convertTopicMetadata(String code, Topic topic, Map<I
}

return new TopicMetadata(code, topic.getProducerPolicy(), topic.getConsumerPolicy(), topic.getType(), partitionGroups, partitions, partitionMap, partitionGroupMap, brokers,
nearbyBrokers, brokerMap, brokerPartitions, brokerPartitionGroups, allAvailable, topic.getCode());
Lists.newArrayList(writableBrokers), Lists.newArrayList(readableBrokers), nearbyBrokers, Lists.newArrayList(writableNearbyBrokers), Lists.newArrayList(readableNearbyBrokers),
brokerMap, brokerPartitions, brokerPartitionGroups, allAvailable, topic.getCode());
}

public static PartitionGroupMetadata convertPartitionGroupMetadata(String topic, TopicPartitionGroup partitionGroup, Map<Integer, BrokerNode> brokers) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.joyqueue.client.internal.metadata.domain;

/**
* PartitionNode
* author: gaohaoxiang
* date: 2020/8/11
*/
public class PartitionNode {

private PartitionMetadata partitionMetadata;

public PartitionNode(PartitionMetadata partitionMetadata) {
this.partitionMetadata = partitionMetadata;
}

public PartitionNodeTracer begin() {
return new PartitionNodeTracer();
}

public PartitionMetadata getPartitionMetadata() {
return partitionMetadata;
}

public static class PartitionNodeTracer {

public void end() {

}

public void error() {

}
}
}
Loading

0 comments on commit 0d8befd

Please sign in to comment.