Skip to content

Commit

Permalink
客户端拦截器允许修改主题 (#331)
Browse files Browse the repository at this point in the history
* 调整拦截器,可以动态修改生产主题
* 调整客户端元数据,返回策略字段
* 添加报错信息
  • Loading branch information
llIlll committed Dec 28, 2020
1 parent 81b294a commit 7342eac
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.joyqueue.client.internal.consumer.domain.ConsumeMessage;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.nameserver.NameServerConfig;

import java.util.List;
Expand All @@ -36,14 +37,16 @@ public class ConsumeContext {
private String topic;
private String app;
private NameServerConfig nameserver;
private TopicMetadata topicMetadata;
private List<ConsumeMessage> messages;
private Map<Object, Object> attributes;
private Set<ConsumeMessage> messageFilter;

public ConsumeContext(String topic, String app, NameServerConfig nameserver, List<ConsumeMessage> messages) {
public ConsumeContext(String topic, String app, NameServerConfig nameserverConfig, TopicMetadata topicMetadata, List<ConsumeMessage> messages) {
this.topic = topic;
this.app = app;
this.nameserver = nameserver;
this.nameserver = nameserverConfig;
this.topicMetadata = topicMetadata;
this.messages = messages;
}

Expand All @@ -59,6 +62,10 @@ public NameServerConfig getNameserver() {
return nameserver;
}

public TopicMetadata getTopicMetadata() {
return topicMetadata;
}

public List<ConsumeMessage> getMessages() {
return messages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
*/
package org.joyqueue.client.internal.consumer.interceptor;

import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
import org.joyqueue.client.internal.consumer.domain.ConsumeMessage;
import org.joyqueue.client.internal.consumer.domain.ConsumeReply;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.apache.commons.collections.CollectionUtils;

import java.util.Collections;
import java.util.List;
Expand All @@ -35,22 +36,24 @@ public class ConsumerInvocation {
private ConsumerConfig config;
private String topic;
private NameServerConfig nameServerConfig;
private TopicMetadata topicMetadata;
private List<ConsumeMessage> messages;
private ConsumerInterceptorManager consumerInterceptorManager;
private ConsumerInvoker consumerInvoker;

public ConsumerInvocation(ConsumerConfig config, String topic, NameServerConfig nameServerConfig, List<ConsumeMessage> messages,
public ConsumerInvocation(ConsumerConfig config, String topic, NameServerConfig nameServerConfig, TopicMetadata topicMetadata, List<ConsumeMessage> messages,
ConsumerInterceptorManager consumerInterceptorManager, ConsumerInvoker consumerInvoker) {
this.config = config;
this.topic = topic;
this.nameServerConfig = nameServerConfig;
this.topicMetadata = topicMetadata;
this.messages = messages;
this.consumerInterceptorManager = consumerInterceptorManager;
this.consumerInvoker = consumerInvoker;
}

public List<ConsumeReply> invoke() {
ConsumeContext context = new ConsumeContext(topic, config.getApp(), nameServerConfig, Collections.unmodifiableList(messages));
ConsumeContext context = new ConsumeContext(topic, config.getApp(), nameServerConfig, topicMetadata, Collections.unmodifiableList(messages));
List<ConsumerInterceptor> interceptors = consumerInterceptorManager.getSortedInterceptors();

if (CollectionUtils.isEmpty(interceptors)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ protected List<ConsumeReply> doDispatch(TopicMetadata topicMetadata, ConsumerPol

protected List<ConsumeReply> doBatchDispatch(TopicMetadata topicMetadata, ConsumerPolicy consumerPolicy,
List<ConsumeMessage> messages, List<BatchMessageListener> listeners) {
return new ConsumerInvocation(config, topic, nameServerConfig, messages, consumerInterceptorManager,
return new ConsumerInvocation(config, topic, nameServerConfig, topicMetadata, messages, consumerInterceptorManager,
new BatchConsumerInvoker(config, topicMetadata, consumerPolicy, messages, listeners)).invoke();
}

protected List<ConsumeReply> doOnceDispatch(TopicMetadata topicMetadata, final ConsumerPolicy consumerPolicy, final List<ConsumeMessage> messages, final List<MessageListener> listeners) {
return new ConsumerInvocation(config, topic, nameServerConfig, messages, consumerInterceptorManager,
return new ConsumerInvocation(config, topic, nameServerConfig, topicMetadata, messages, consumerInterceptorManager,
new OnceConsumerInvoker(config, topicMetadata, consumerPolicy, messages, listeners)).invoke();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public static TopicMetadata convertTopicMetadata(String code, Topic topic, Map<I

return new TopicMetadata(code, topic.getProducerPolicy(), topic.getConsumerPolicy(), topic.getType(), partitionGroups, partitions, partitionMap, partitionGroupMap, brokers,
Lists.newArrayList(writableBrokers), Lists.newArrayList(readableBrokers), nearbyBrokers, Lists.newArrayList(writableNearbyBrokers), Lists.newArrayList(readableNearbyBrokers),
brokerMap, brokerPartitions, brokerPartitionGroups, allAvailable, topic.getCode());
brokerMap, brokerPartitions, brokerPartitionGroups, allAvailable, topic.getParams(), topic.getCode());
}

public static PartitionGroupMetadata convertPartitionGroupMetadata(String topic, TopicPartitionGroup partitionGroup, Map<Integer, BrokerNode> brokers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class TopicMetadata implements Serializable {
private ProducerPolicy producerPolicy;
private ConsumerPolicy consumerPolicy;
private TopicType type;
private Map<String, String> params;
private JoyQueueCode code;

private List<PartitionGroupMetadata> partitionGroups;
Expand Down Expand Up @@ -76,7 +77,7 @@ public TopicMetadata(String topic, ProducerPolicy producerPolicy, ConsumerPolicy
List<PartitionMetadata> partitions, Map<Short, PartitionMetadata> partitionMap, Map<Integer, PartitionGroupMetadata> partitionGroupMap, List<BrokerNode> brokers,
List<BrokerNode> writableBrokers, List<BrokerNode> readableBrokers, List<BrokerNode> nearbyBrokers, List<BrokerNode> writableNearbyBrokers,
List<BrokerNode> readableNearbyBrokers, Map<Integer, BrokerNode> brokerMap, Map<Integer, List<PartitionMetadata>> brokerPartitions,
Map<Integer, List<PartitionGroupMetadata>> brokerPartitionGroups, boolean allAvailable, JoyQueueCode code) {
Map<Integer, List<PartitionGroupMetadata>> brokerPartitionGroups, boolean allAvailable, Map<String, String> params, JoyQueueCode code) {
this.topic = topic;
this.producerPolicy = producerPolicy;
this.consumerPolicy = consumerPolicy;
Expand All @@ -95,6 +96,7 @@ public TopicMetadata(String topic, ProducerPolicy producerPolicy, ConsumerPolicy
this.brokerPartitions = brokerPartitions;
this.brokerPartitionGroups = brokerPartitionGroups;
this.allAvailable = allAvailable;
this.params = params;
this.code = code;
}

Expand Down Expand Up @@ -174,6 +176,10 @@ public JoyQueueCode getCode() {
return code;
}

public Map<String, String> getParams() {
return params;
}

public void setAttachments(ConcurrentMap<Object, Object> attachments) {
this.attachments = attachments;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.joyqueue.client.internal.producer.interceptor;

import com.google.common.collect.Maps;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.client.internal.producer.domain.ProduceMessage;

Expand All @@ -32,14 +33,16 @@ public class ProduceContext {

private String topic;
private String app;
private NameServerConfig nameserver;
private NameServerConfig nameserverConfig;
private TopicMetadata topicMetadata;
private List<ProduceMessage> messages;
private Map<Object, Object> attributes;

public ProduceContext(String topic, String app, NameServerConfig nameserver, List<ProduceMessage> messages) {
public ProduceContext(String topic, String app, NameServerConfig nameserverConfig, TopicMetadata topicMetadata, List<ProduceMessage> messages) {
this.topic = topic;
this.app = app;
this.nameserver = nameserver;
this.nameserverConfig = nameserverConfig;
this.topicMetadata = topicMetadata;
this.messages = messages;
}

Expand All @@ -51,14 +54,18 @@ public String getApp() {
return app;
}

public NameServerConfig getNameserver() {
return nameserver;
public NameServerConfig getNameserverConfig() {
return nameserverConfig;
}

public List<ProduceMessage> getMessages() {
return messages;
}

public TopicMetadata getTopicMetadata() {
return topicMetadata;
}

public <T> T getAttribute(Object key) {
if (attributes == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public ProducerInvocation(ProducerConfig config, NameServerConfig nameServerConf
}

public List<SendResult> invoke() {
ProduceContext context = new ProduceContext(topicMetadata.getTopic(), config.getApp(), nameServerConfig, Collections.unmodifiableList(messages));
ProduceContext context = new ProduceContext(topicMetadata.getTopic(), config.getApp(), nameServerConfig, topicMetadata, Collections.unmodifiableList(messages));
List<ProducerInterceptor> interceptors = producerInterceptorManager.getSortedInterceptors();

if (CollectionUtils.isEmpty(interceptors)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,30 @@ public List<SendResult> batchSend(List<ProduceMessage> messages, String txId, lo
return doBatchSend(messages, txId, timeout, timeoutUnit, isOneway, failover, callback);
}

public List<SendResult> doBatchSend(List<ProduceMessage> messages, String txId, long timeout, TimeUnit timeoutUnit, boolean isOneway, boolean failover, AsyncBatchProduceCallback callback) {
protected List<SendResult> doBatchSend(List<ProduceMessage> messages, String txId, long timeout, TimeUnit timeoutUnit, boolean isOneway, boolean failover, AsyncBatchProduceCallback callback) {
TopicMetadata topicMetadata = getAndCheckTopicMetadata(messages.get(0).getTopic());
List<BrokerNode> brokerNodes = getAvailableBrokers(topicMetadata);

return doBatchSend(messages, topicMetadata, brokerNodes,
txId, timeout, timeoutUnit, isOneway, failover, callback);
try {
return new ProducerInvocation(config, nameServerConfig, topicMetadata, messages, producerInterceptorManager, new ProducerInvoker() {
@Override
public List<SendResult> invoke(ProduceContext context) {
TopicMetadata topicMetadata = getAndCheckTopicMetadata(messages.get(0).getTopic());
List<BrokerNode> brokerNodes = getAvailableBrokers(topicMetadata);
return doBatchSendInternal(messages, topicMetadata, brokerNodes, txId, timeout, timeoutUnit, isOneway, failover, callback);
}

@Override
public List<SendResult> reject(ProduceContext context) {
throw new ProducerException("reject send", JoyQueueCode.CN_UNKNOWN_ERROR.getCode());
}
}).invoke();
} catch (Exception e) {
if (e instanceof ProducerException) {
throw (ProducerException) e;
} else {
throw new ProducerException(e);
}
}
}

public List<SendResult> doBatchSend(List<ProduceMessage> messages, TopicMetadata topicMetadata, List<BrokerNode> brokers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public boolean preSend(ProduceContext context) {
TraceCaller caller = TraceBuilder.newInstance()
.topic(context.getTopic())
.app(context.getApp())
.namespace(context.getNameserver().getNamespace())
.namespace(context.getNameserverConfig().getNamespace())
.type(TraceType.PRODUCER_SEND)
.begin();
context.putAttribute(CALLER_KEY, caller);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public Command sync(Command request, long timeout) {
}

if (!response.isSuccess()) {
throw new ClientException(response.getHeader().getError(), response.getHeader().getStatus());
throw new ClientException(transport.remoteAddress() + ":" + response.getHeader().getError(), response.getHeader().getStatus());
}
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.joyqueue.domain.ConsumerPolicy;
import org.joyqueue.domain.ProducerPolicy;
import org.joyqueue.domain.TopicType;
Expand All @@ -32,9 +35,6 @@
import org.joyqueue.network.transport.codec.PayloadCodec;
import org.joyqueue.network.transport.command.Header;
import org.joyqueue.network.transport.command.Type;
import io.netty.buffer.ByteBuf;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;

import java.util.Map;
import java.util.Set;
Expand All @@ -57,7 +57,7 @@ public FetchClusterResponse decode(JoyQueueHeader header, ByteBuf buffer) throws

short topicSize = buffer.readShort();
for (int i = 0; i < topicSize; i++) {
Topic topic = decodeTopic(buffer);
Topic topic = decodeTopic(header, buffer);
topics.put(topic.getTopic(), topic);
}

Expand All @@ -72,7 +72,7 @@ public FetchClusterResponse decode(JoyQueueHeader header, ByteBuf buffer) throws
return fetchClusterResponse;
}

protected Topic decodeTopic(ByteBuf buffer) throws Exception {
protected Topic decodeTopic(JoyQueueHeader header, ByteBuf buffer) throws Exception {
String topicCode = Serializer.readString(buffer, Serializer.SHORT_SIZE);
Topic topic = new Topic();
topic.setTopic(topicCode);
Expand Down Expand Up @@ -150,6 +150,19 @@ protected Topic decodeTopic(ByteBuf buffer) throws Exception {

topic.setPartitionGroups(partitionGroups);
topic.setCode(JoyQueueCode.valueOf(buffer.readInt()));

if (header.getVersion() >= JoyQueueHeader.VERSION_V4) {
int paramSize = buffer.readShort();
if (paramSize > 0) {
Map<String, String> params = Maps.newHashMap();
for (int i = 0; i < paramSize; i++) {
String key = Serializer.readString(buffer, Serializer.SHORT_SIZE);
String value = Serializer.readString(buffer, Serializer.SHORT_SIZE);
params.put(key, value);
}
topic.setParams(params);
}
}
return topic;
}

Expand All @@ -173,7 +186,7 @@ protected BrokerNode decodeBroker(JoyQueueHeader header, ByteBuf buffer) throws
public void encode(FetchClusterResponse payload, ByteBuf buffer) throws Exception {
buffer.writeShort(payload.getTopics().size());
for (Map.Entry<String, Topic> entry : payload.getTopics().entrySet()) {
encodeTopic(entry.getValue(), buffer);
encodeTopic(payload.getHeader(), entry.getValue(), buffer);
}

buffer.writeShort(payload.getBrokers().size());
Expand All @@ -182,7 +195,7 @@ public void encode(FetchClusterResponse payload, ByteBuf buffer) throws Exceptio
}
}

protected void encodeTopic(Topic topic, ByteBuf buffer) throws Exception {
protected void encodeTopic(Header header, Topic topic, ByteBuf buffer) throws Exception {
ProducerPolicy producerPolicy = topic.getProducerPolicy();
ConsumerPolicy consumerPolicy = topic.getConsumerPolicy();
Serializer.write(topic.getTopic(), buffer, Serializer.SHORT_SIZE);
Expand Down Expand Up @@ -269,6 +282,18 @@ protected void encodeTopic(Topic topic, ByteBuf buffer) throws Exception {
}

buffer.writeInt(topic.getCode().getCode());

if (header.getVersion() >= JoyQueueHeader.VERSION_V4 && buffer.isReadable()) {
if (MapUtils.isEmpty(topic.getParams())) {
buffer.writeShort(0);
} else {
buffer.writeShort(topic.getParams().size());
for (Map.Entry<String, String> entry : topic.getParams().entrySet()) {
Serializer.write(entry.getKey(), buffer, Serializer.SHORT_SIZE);
Serializer.write(entry.getValue(), buffer, Serializer.SHORT_SIZE);
}
}
}
}

protected void encodeBroker(Header header, BrokerNode brokerNode, ByteBuf buffer) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class Topic implements Serializable {
private ConsumerPolicy consumerPolicy;
private TopicType type;
private Map<Integer, TopicPartitionGroup> partitionGroups;
private Map<String, String> params;
private JoyQueueCode code;

public void setTopic(String topic) {
Expand Down Expand Up @@ -78,6 +79,14 @@ public void setPartitionGroups(Map<Integer, TopicPartitionGroup> partitionGroups
this.partitionGroups = partitionGroups;
}

public Map<String, String> getParams() {
return params;
}

public void setParams(Map<String, String> params) {
this.params = params;
}

public void setCode(JoyQueueCode code) {
this.code = code;
}
Expand All @@ -94,6 +103,7 @@ public String toString() {
", consumerPolicy=" + consumerPolicy +
", type=" + type +
", partitionGroups=" + partitionGroups +
", params=" + params +
", code=" + code +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ protected Topic getTopicMetadata(Connection connection, String topic, String app

result.setCode(JoyQueueCode.SUCCESS);
result.setPartitionGroups(convertTopicPartitionGroups(connection, topicConfig.getPartitionGroups().values(), brokers));
if (topicConfig.getPolicy() != null) {
result.setParams(topicConfig.getPolicy().getParams());
}
return result;
}

Expand Down

0 comments on commit 7342eac

Please sign in to comment.