From 7342eac2875b1a646197aab734d198f780086498 Mon Sep 17 00:00:00 2001 From: llIlll <10194588+llIlll@users.noreply.github.com> Date: Mon, 28 Dec 2020 15:42:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=8B=A6=E6=88=AA?= =?UTF-8?q?=E5=99=A8=E5=85=81=E8=AE=B8=E4=BF=AE=E6=94=B9=E4=B8=BB=E9=A2=98?= =?UTF-8?q?=20(#331)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 调整拦截器,可以动态修改生产主题 * 调整客户端元数据,返回策略字段 * 添加报错信息 --- .../consumer/interceptor/ConsumeContext.java | 11 +++++- .../interceptor/ConsumerInvocation.java | 9 +++-- .../TopicMessageConsumerDispatcher.java | 4 +- .../converter/ClusterMetadataConverter.java | 2 +- .../metadata/domain/TopicMetadata.java | 8 +++- .../producer/interceptor/ProduceContext.java | 17 +++++--- .../interceptor/ProducerInvocation.java | 2 +- .../support/MessageProducerInner.java | 26 +++++++++++-- .../interceptor/TraceProducerInterceptor.java | 2 +- .../client/internal/transport/Client.java | 2 +- .../codec/FetchClusterResponseCodec.java | 39 +++++++++++++++---- .../org/joyqueue/network/command/Topic.java | 10 +++++ .../handler/FetchClusterRequestHandler.java | 3 ++ 13 files changed, 107 insertions(+), 28 deletions(-) diff --git a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/interceptor/ConsumeContext.java b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/interceptor/ConsumeContext.java index ea0c0393c..88eb57905 100644 --- a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/interceptor/ConsumeContext.java +++ b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/interceptor/ConsumeContext.java @@ -19,6 +19,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.joyqueue.client.internal.consumer.domain.ConsumeMessage; +import org.joyqueue.client.internal.metadata.domain.TopicMetadata; import org.joyqueue.client.internal.nameserver.NameServerConfig; import java.util.List; @@ -36,14 +37,16 @@ public class ConsumeContext { private String topic; private String app; private NameServerConfig nameserver; + private TopicMetadata topicMetadata; private List messages; private Map attributes; private Set messageFilter; - public ConsumeContext(String topic, String app, NameServerConfig nameserver, List messages) { + public ConsumeContext(String topic, String app, NameServerConfig nameserverConfig, TopicMetadata topicMetadata, List messages) { this.topic = topic; this.app = app; - this.nameserver = nameserver; + this.nameserver = nameserverConfig; + this.topicMetadata = topicMetadata; this.messages = messages; } @@ -59,6 +62,10 @@ public NameServerConfig getNameserver() { return nameserver; } + public TopicMetadata getTopicMetadata() { + return topicMetadata; + } + public List getMessages() { return messages; } diff --git a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/interceptor/ConsumerInvocation.java b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/interceptor/ConsumerInvocation.java index a502114bc..83a5a2b21 100644 --- a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/interceptor/ConsumerInvocation.java +++ b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/interceptor/ConsumerInvocation.java @@ -15,11 +15,12 @@ */ package org.joyqueue.client.internal.consumer.interceptor; +import org.apache.commons.collections.CollectionUtils; import org.joyqueue.client.internal.consumer.config.ConsumerConfig; import org.joyqueue.client.internal.consumer.domain.ConsumeMessage; import org.joyqueue.client.internal.consumer.domain.ConsumeReply; +import org.joyqueue.client.internal.metadata.domain.TopicMetadata; import org.joyqueue.client.internal.nameserver.NameServerConfig; -import org.apache.commons.collections.CollectionUtils; import java.util.Collections; import java.util.List; @@ -35,22 +36,24 @@ public class ConsumerInvocation { private ConsumerConfig config; private String topic; private NameServerConfig nameServerConfig; + private TopicMetadata topicMetadata; private List messages; private ConsumerInterceptorManager consumerInterceptorManager; private ConsumerInvoker consumerInvoker; - public ConsumerInvocation(ConsumerConfig config, String topic, NameServerConfig nameServerConfig, List messages, + public ConsumerInvocation(ConsumerConfig config, String topic, NameServerConfig nameServerConfig, TopicMetadata topicMetadata, List messages, ConsumerInterceptorManager consumerInterceptorManager, ConsumerInvoker consumerInvoker) { this.config = config; this.topic = topic; this.nameServerConfig = nameServerConfig; + this.topicMetadata = topicMetadata; this.messages = messages; this.consumerInterceptorManager = consumerInterceptorManager; this.consumerInvoker = consumerInvoker; } public List invoke() { - ConsumeContext context = new ConsumeContext(topic, config.getApp(), nameServerConfig, Collections.unmodifiableList(messages)); + ConsumeContext context = new ConsumeContext(topic, config.getApp(), nameServerConfig, topicMetadata, Collections.unmodifiableList(messages)); List interceptors = consumerInterceptorManager.getSortedInterceptors(); if (CollectionUtils.isEmpty(interceptors)) { diff --git a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/support/TopicMessageConsumerDispatcher.java b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/support/TopicMessageConsumerDispatcher.java index 8717e457f..5fdfbd11a 100644 --- a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/support/TopicMessageConsumerDispatcher.java +++ b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/consumer/support/TopicMessageConsumerDispatcher.java @@ -109,12 +109,12 @@ protected List doDispatch(TopicMetadata topicMetadata, ConsumerPol protected List doBatchDispatch(TopicMetadata topicMetadata, ConsumerPolicy consumerPolicy, List messages, List listeners) { - return new ConsumerInvocation(config, topic, nameServerConfig, messages, consumerInterceptorManager, + return new ConsumerInvocation(config, topic, nameServerConfig, topicMetadata, messages, consumerInterceptorManager, new BatchConsumerInvoker(config, topicMetadata, consumerPolicy, messages, listeners)).invoke(); } protected List doOnceDispatch(TopicMetadata topicMetadata, final ConsumerPolicy consumerPolicy, final List messages, final List listeners) { - return new ConsumerInvocation(config, topic, nameServerConfig, messages, consumerInterceptorManager, + return new ConsumerInvocation(config, topic, nameServerConfig, topicMetadata, messages, consumerInterceptorManager, new OnceConsumerInvoker(config, topicMetadata, consumerPolicy, messages, listeners)).invoke(); } } \ No newline at end of file diff --git a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/metadata/converter/ClusterMetadataConverter.java b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/metadata/converter/ClusterMetadataConverter.java index 58fb36915..ded52ae23 100644 --- a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/metadata/converter/ClusterMetadataConverter.java +++ b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/metadata/converter/ClusterMetadataConverter.java @@ -149,7 +149,7 @@ public static TopicMetadata convertTopicMetadata(String code, Topic topic, Map brokers) { diff --git a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/metadata/domain/TopicMetadata.java b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/metadata/domain/TopicMetadata.java index c0d0d345e..2785cc274 100644 --- a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/metadata/domain/TopicMetadata.java +++ b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/metadata/domain/TopicMetadata.java @@ -39,6 +39,7 @@ public class TopicMetadata implements Serializable { private ProducerPolicy producerPolicy; private ConsumerPolicy consumerPolicy; private TopicType type; + private Map params; private JoyQueueCode code; private List partitionGroups; @@ -76,7 +77,7 @@ public TopicMetadata(String topic, ProducerPolicy producerPolicy, ConsumerPolicy List partitions, Map partitionMap, Map partitionGroupMap, List brokers, List writableBrokers, List readableBrokers, List nearbyBrokers, List writableNearbyBrokers, List readableNearbyBrokers, Map brokerMap, Map> brokerPartitions, - Map> brokerPartitionGroups, boolean allAvailable, JoyQueueCode code) { + Map> brokerPartitionGroups, boolean allAvailable, Map params, JoyQueueCode code) { this.topic = topic; this.producerPolicy = producerPolicy; this.consumerPolicy = consumerPolicy; @@ -95,6 +96,7 @@ public TopicMetadata(String topic, ProducerPolicy producerPolicy, ConsumerPolicy this.brokerPartitions = brokerPartitions; this.brokerPartitionGroups = brokerPartitionGroups; this.allAvailable = allAvailable; + this.params = params; this.code = code; } @@ -174,6 +176,10 @@ public JoyQueueCode getCode() { return code; } + public Map getParams() { + return params; + } + public void setAttachments(ConcurrentMap attachments) { this.attachments = attachments; } diff --git a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/interceptor/ProduceContext.java b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/interceptor/ProduceContext.java index e49dd7039..36aaa843d 100644 --- a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/interceptor/ProduceContext.java +++ b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/interceptor/ProduceContext.java @@ -16,6 +16,7 @@ package org.joyqueue.client.internal.producer.interceptor; import com.google.common.collect.Maps; +import org.joyqueue.client.internal.metadata.domain.TopicMetadata; import org.joyqueue.client.internal.nameserver.NameServerConfig; import org.joyqueue.client.internal.producer.domain.ProduceMessage; @@ -32,14 +33,16 @@ public class ProduceContext { private String topic; private String app; - private NameServerConfig nameserver; + private NameServerConfig nameserverConfig; + private TopicMetadata topicMetadata; private List messages; private Map attributes; - public ProduceContext(String topic, String app, NameServerConfig nameserver, List messages) { + public ProduceContext(String topic, String app, NameServerConfig nameserverConfig, TopicMetadata topicMetadata, List messages) { this.topic = topic; this.app = app; - this.nameserver = nameserver; + this.nameserverConfig = nameserverConfig; + this.topicMetadata = topicMetadata; this.messages = messages; } @@ -51,14 +54,18 @@ public String getApp() { return app; } - public NameServerConfig getNameserver() { - return nameserver; + public NameServerConfig getNameserverConfig() { + return nameserverConfig; } public List getMessages() { return messages; } + public TopicMetadata getTopicMetadata() { + return topicMetadata; + } + public T getAttribute(Object key) { if (attributes == null) { return null; diff --git a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/interceptor/ProducerInvocation.java b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/interceptor/ProducerInvocation.java index 621c74c46..10306603b 100644 --- a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/interceptor/ProducerInvocation.java +++ b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/interceptor/ProducerInvocation.java @@ -51,7 +51,7 @@ public ProducerInvocation(ProducerConfig config, NameServerConfig nameServerConf } public List invoke() { - ProduceContext context = new ProduceContext(topicMetadata.getTopic(), config.getApp(), nameServerConfig, Collections.unmodifiableList(messages)); + ProduceContext context = new ProduceContext(topicMetadata.getTopic(), config.getApp(), nameServerConfig, topicMetadata, Collections.unmodifiableList(messages)); List interceptors = producerInterceptorManager.getSortedInterceptors(); if (CollectionUtils.isEmpty(interceptors)) { diff --git a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/support/MessageProducerInner.java b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/support/MessageProducerInner.java index ba89d5f74..3ce3ac54e 100644 --- a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/support/MessageProducerInner.java +++ b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/producer/support/MessageProducerInner.java @@ -122,12 +122,30 @@ public List batchSend(List messages, String txId, lo return doBatchSend(messages, txId, timeout, timeoutUnit, isOneway, failover, callback); } - public List doBatchSend(List messages, String txId, long timeout, TimeUnit timeoutUnit, boolean isOneway, boolean failover, AsyncBatchProduceCallback callback) { + protected List doBatchSend(List messages, String txId, long timeout, TimeUnit timeoutUnit, boolean isOneway, boolean failover, AsyncBatchProduceCallback callback) { TopicMetadata topicMetadata = getAndCheckTopicMetadata(messages.get(0).getTopic()); - List brokerNodes = getAvailableBrokers(topicMetadata); - return doBatchSend(messages, topicMetadata, brokerNodes, - txId, timeout, timeoutUnit, isOneway, failover, callback); + try { + return new ProducerInvocation(config, nameServerConfig, topicMetadata, messages, producerInterceptorManager, new ProducerInvoker() { + @Override + public List invoke(ProduceContext context) { + TopicMetadata topicMetadata = getAndCheckTopicMetadata(messages.get(0).getTopic()); + List brokerNodes = getAvailableBrokers(topicMetadata); + return doBatchSendInternal(messages, topicMetadata, brokerNodes, txId, timeout, timeoutUnit, isOneway, failover, callback); + } + + @Override + public List reject(ProduceContext context) { + throw new ProducerException("reject send", JoyQueueCode.CN_UNKNOWN_ERROR.getCode()); + } + }).invoke(); + } catch (Exception e) { + if (e instanceof ProducerException) { + throw (ProducerException) e; + } else { + throw new ProducerException(e); + } + } } public List doBatchSend(List messages, TopicMetadata topicMetadata, List brokers, diff --git a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/trace/interceptor/TraceProducerInterceptor.java b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/trace/interceptor/TraceProducerInterceptor.java index c3ae92b33..e25098479 100644 --- a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/trace/interceptor/TraceProducerInterceptor.java +++ b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/trace/interceptor/TraceProducerInterceptor.java @@ -40,7 +40,7 @@ public boolean preSend(ProduceContext context) { TraceCaller caller = TraceBuilder.newInstance() .topic(context.getTopic()) .app(context.getApp()) - .namespace(context.getNameserver().getNamespace()) + .namespace(context.getNameserverConfig().getNamespace()) .type(TraceType.PRODUCER_SEND) .begin(); context.putAttribute(CALLER_KEY, caller); diff --git a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/transport/Client.java b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/transport/Client.java index 03c06be0c..abe633a23 100644 --- a/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/transport/Client.java +++ b/joyqueue-client/joyqueue-client-core/src/main/java/org/joyqueue/client/internal/transport/Client.java @@ -128,7 +128,7 @@ public Command sync(Command request, long timeout) { } if (!response.isSuccess()) { - throw new ClientException(response.getHeader().getError(), response.getHeader().getStatus()); + throw new ClientException(transport.remoteAddress() + ":" + response.getHeader().getError(), response.getHeader().getStatus()); } return response; } diff --git a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/codec/FetchClusterResponseCodec.java b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/codec/FetchClusterResponseCodec.java index 80c7d3e9e..e38c18414 100644 --- a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/codec/FetchClusterResponseCodec.java +++ b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/codec/FetchClusterResponseCodec.java @@ -17,6 +17,9 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.joyqueue.domain.ConsumerPolicy; import org.joyqueue.domain.ProducerPolicy; import org.joyqueue.domain.TopicType; @@ -32,9 +35,6 @@ import org.joyqueue.network.transport.codec.PayloadCodec; import org.joyqueue.network.transport.command.Header; import org.joyqueue.network.transport.command.Type; -import io.netty.buffer.ByteBuf; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; import java.util.Map; import java.util.Set; @@ -57,7 +57,7 @@ public FetchClusterResponse decode(JoyQueueHeader header, ByteBuf buffer) throws short topicSize = buffer.readShort(); for (int i = 0; i < topicSize; i++) { - Topic topic = decodeTopic(buffer); + Topic topic = decodeTopic(header, buffer); topics.put(topic.getTopic(), topic); } @@ -72,7 +72,7 @@ public FetchClusterResponse decode(JoyQueueHeader header, ByteBuf buffer) throws return fetchClusterResponse; } - protected Topic decodeTopic(ByteBuf buffer) throws Exception { + protected Topic decodeTopic(JoyQueueHeader header, ByteBuf buffer) throws Exception { String topicCode = Serializer.readString(buffer, Serializer.SHORT_SIZE); Topic topic = new Topic(); topic.setTopic(topicCode); @@ -150,6 +150,19 @@ protected Topic decodeTopic(ByteBuf buffer) throws Exception { topic.setPartitionGroups(partitionGroups); topic.setCode(JoyQueueCode.valueOf(buffer.readInt())); + + if (header.getVersion() >= JoyQueueHeader.VERSION_V4) { + int paramSize = buffer.readShort(); + if (paramSize > 0) { + Map params = Maps.newHashMap(); + for (int i = 0; i < paramSize; i++) { + String key = Serializer.readString(buffer, Serializer.SHORT_SIZE); + String value = Serializer.readString(buffer, Serializer.SHORT_SIZE); + params.put(key, value); + } + topic.setParams(params); + } + } return topic; } @@ -173,7 +186,7 @@ protected BrokerNode decodeBroker(JoyQueueHeader header, ByteBuf buffer) throws public void encode(FetchClusterResponse payload, ByteBuf buffer) throws Exception { buffer.writeShort(payload.getTopics().size()); for (Map.Entry entry : payload.getTopics().entrySet()) { - encodeTopic(entry.getValue(), buffer); + encodeTopic(payload.getHeader(), entry.getValue(), buffer); } buffer.writeShort(payload.getBrokers().size()); @@ -182,7 +195,7 @@ public void encode(FetchClusterResponse payload, ByteBuf buffer) throws Exceptio } } - protected void encodeTopic(Topic topic, ByteBuf buffer) throws Exception { + protected void encodeTopic(Header header, Topic topic, ByteBuf buffer) throws Exception { ProducerPolicy producerPolicy = topic.getProducerPolicy(); ConsumerPolicy consumerPolicy = topic.getConsumerPolicy(); Serializer.write(topic.getTopic(), buffer, Serializer.SHORT_SIZE); @@ -269,6 +282,18 @@ protected void encodeTopic(Topic topic, ByteBuf buffer) throws Exception { } buffer.writeInt(topic.getCode().getCode()); + + if (header.getVersion() >= JoyQueueHeader.VERSION_V4 && buffer.isReadable()) { + if (MapUtils.isEmpty(topic.getParams())) { + buffer.writeShort(0); + } else { + buffer.writeShort(topic.getParams().size()); + for (Map.Entry entry : topic.getParams().entrySet()) { + Serializer.write(entry.getKey(), buffer, Serializer.SHORT_SIZE); + Serializer.write(entry.getValue(), buffer, Serializer.SHORT_SIZE); + } + } + } } protected void encodeBroker(Header header, BrokerNode brokerNode, ByteBuf buffer) throws Exception { diff --git a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/command/Topic.java b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/command/Topic.java index 9245597fb..d07598ace 100644 --- a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/command/Topic.java +++ b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/command/Topic.java @@ -36,6 +36,7 @@ public class Topic implements Serializable { private ConsumerPolicy consumerPolicy; private TopicType type; private Map partitionGroups; + private Map params; private JoyQueueCode code; public void setTopic(String topic) { @@ -78,6 +79,14 @@ public void setPartitionGroups(Map partitionGroups this.partitionGroups = partitionGroups; } + public Map getParams() { + return params; + } + + public void setParams(Map params) { + this.params = params; + } + public void setCode(JoyQueueCode code) { this.code = code; } @@ -94,6 +103,7 @@ public String toString() { ", consumerPolicy=" + consumerPolicy + ", type=" + type + ", partitionGroups=" + partitionGroups + + ", params=" + params + ", code=" + code + '}'; } diff --git a/joyqueue-server/joyqueue-broker-protocol/src/main/java/org/joyqueue/broker/protocol/handler/FetchClusterRequestHandler.java b/joyqueue-server/joyqueue-broker-protocol/src/main/java/org/joyqueue/broker/protocol/handler/FetchClusterRequestHandler.java index db3c3ac55..51a91b2b3 100644 --- a/joyqueue-server/joyqueue-broker-protocol/src/main/java/org/joyqueue/broker/protocol/handler/FetchClusterRequestHandler.java +++ b/joyqueue-server/joyqueue-broker-protocol/src/main/java/org/joyqueue/broker/protocol/handler/FetchClusterRequestHandler.java @@ -154,6 +154,9 @@ protected Topic getTopicMetadata(Connection connection, String topic, String app result.setCode(JoyQueueCode.SUCCESS); result.setPartitionGroups(convertTopicPartitionGroups(connection, topicConfig.getPartitionGroups().values(), brokers)); + if (topicConfig.getPolicy() != null) { + result.setParams(topicConfig.getPolicy().getParams()); + } return result; }