Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-270: Support provisioning of Quorum queues #271

Merged
merged 1 commit into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 54 additions & 0 deletions docs/src/main/asciidoc/overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,18 @@ dlqOverflowBehavior::
Action to take when `dlqMaxLength` or `dlqMaxLengthBytes` is exceeded; currently `drop-head` or `reject-publish` but refer to the RabbitMQ documentation.
+
Default: `none`
dlqQuorum.deliveryLimit::
When `quorum.enabled=true`, set a delivery limit after which the message is dropped or dead-lettered.
+
Default: none - broker default will apply.
dlqQuorum.enabled::
When true, create a quorum dead letter queue instead of a classic queue.
+
Default: false
dlqQuorum.initialQuorumSize::
When `quorum.enabled=true`, set the initial quorum size.
+
Default: none - broker default will apply.
dlqTtl::
Default time to live to apply to the dead letter queue when declared (in milliseconds).
+
Expand Down Expand Up @@ -342,6 +354,18 @@ Otherwise the queue name is `destination.group`.
This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.
+
Default: false.
quorum.deliveryLimit::
When `quorum.enabled=true`, set a delivery limit after which the message is dropped or dead-lettered.
+
Default: none - broker default will apply.
quorum.enabled::
When true, create a quorum queue instead of a classic queue.
+
Default: false
quorum.initialQuorumSize::
When `quorum.enabled=true`, set the initial quorum size.
+
Default: none - broker default will apply.
recoveryInterval::
The interval between connection recovery attempts, in milliseconds.
+
Expand Down Expand Up @@ -542,6 +566,21 @@ Maximum priority of messages in the dead letter queue (0-255)
Applies only when `requiredGroups` are provided and then only to those groups.
+
Default: `none`
dlqQuorum.deliveryLimit::
When `quorum.enabled=true`, set a delivery limit after which the message is dropped or dead-lettered.
Applies only when `requiredGroups` are provided and then only to those groups.
+
Default: none - broker default will apply.
dlqQuorum.enabled::
When true, create a quorum dead letter queue instead of a classic queue.
Applies only when `requiredGroups` are provided and then only to those groups.
+
Default: false
dlqQuorum.initialQuorumSize::
When `quorum.enabled=true`, set the initial quorum size.
Applies only when `requiredGroups` are provided and then only to those groups.
+
Default: none - broker default will apply.
dlqTtl::
Default time (in milliseconds) to live to apply to the dead letter queue when declared.
Applies only when `requiredGroups` are provided and then only to those groups.
Expand Down Expand Up @@ -607,6 +646,21 @@ This is useful, for example, when using Spring Cloud Stream to consume from an e
Applies only when `requiredGroups` are provided and then only to those groups.
+
Default: false.
quorum.deliveryLimit::
When `quorum.enabled=true`, set a delivery limit after which the message is dropped or dead-lettered.
Applies only when `requiredGroups` are provided and then only to those groups.
+
Default: none - broker default will apply.
quorum.enabled::
When true, create a quorum queue instead of a classic queue.
Applies only when `requiredGroups` are provided and then only to those groups.
+
Default: false
quorum.initialQuorumSize::
When `quorum.enabled=true`, set the initial quorum size.
Applies only when `requiredGroups` are provided and then only to those groups.
+
Default: none - broker default will apply.
routingKeyExpression::
A SpEL expression to determine the routing key to use when publishing messages.
For a fixed routing key, use a literal expression, such as `routingKeyExpression='my.routingKey'` in a properties file or `routingKeyExpression: '''my.routingKey'''` in a YAML file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ public abstract class RabbitCommonProperties {
*/
private Map<String, String> dlqBindingArguments = new HashMap<>();

/**
* Configure the queue to be type quorum instead of classic.
*/
private QuorumConfig quorum = new QuorumConfig();

/**
* Configure the DLQ to be type quorum instead of classic.
*/
private QuorumConfig dlqQuorum = new QuorumConfig();

public String getExchangeType() {
return this.exchangeType;
}
Expand Down Expand Up @@ -484,4 +494,54 @@ public void setDlqBindingArguments(Map<String, String> dlqBindingArguments) {
this.dlqBindingArguments = dlqBindingArguments;
}

public QuorumConfig getQuorum() {
return this.quorum;
}

public void setQuorum(QuorumConfig quorum) {
this.quorum = quorum;
}

public QuorumConfig getDlqQuorum() {
return this.dlqQuorum;
}

public void setDlqQuorum(QuorumConfig dlqQuorum) {
this.dlqQuorum = dlqQuorum;
}

public static class QuorumConfig {

private boolean enabled;

private Integer initialGroupSize;

private Integer deliveryLimit;

public boolean isEnabled() {
return this.enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public Integer getInitialGroupSize() {
return this.initialGroupSize;
}

public void setInitialGroupSize(Integer initialGroupSize) {
this.initialGroupSize = initialGroupSize;
}

public Integer getDeliveryLimit() {
return this.deliveryLimit;
}

public void setDeliveryLimit(Integer deliveryLimit) {
this.deliveryLimit = deliveryLimit;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties.QuorumConfig;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
Expand Down Expand Up @@ -499,6 +500,7 @@ private void additionalArgs(Map<String, Object> args,
boolean lazy = isDlq ? properties.isDlqLazy() : properties.isLazy();
String overflow = isDlq ? properties.getDlqOverflowBehavior()
: properties.getOverflowBehavior();
QuorumConfig quorum = isDlq ? properties.getDlqQuorum() : properties.getQuorum();
if (expires != null) {
args.put("x-expires", expires);
}
Expand All @@ -520,6 +522,15 @@ private void additionalArgs(Map<String, Object> args,
if (StringUtils.hasText(overflow)) {
args.put("x-overflow", overflow);
}
if (quorum != null && quorum.isEnabled()) {
args.put("x-queue-type", "quorum");
if (quorum.getDeliveryLimit() != null) {
args.put("x-delivery-limit", quorum.getDeliveryLimit());
}
if (quorum.getInitialGroupSize() != null) {
args.put("x-quorum-initial-group-size", quorum.getInitialGroupSize());
}
}
}

public static String applyPrefix(String prefix, String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.springframework.cloud.stream.binder.PollableSource;
import org.springframework.cloud.stream.binder.RequeueCurrentMessageException;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties.QuorumConfig;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner;
Expand Down Expand Up @@ -399,6 +400,10 @@ public void testConsumerProperties() throws Exception {
properties.getExtension().setPrefetch(20);
properties.getExtension().setHeaderPatterns(new String[] { "foo" });
properties.getExtension().setTxSize(10);
QuorumConfig quorum = properties.getExtension().getQuorum();
quorum.setEnabled(true);
quorum.setDeliveryLimit(10);
quorum.setInitialGroupSize(1);
properties.setInstanceIndex(0);
consumerBinding = binder.bindConsumer("props.0", "test",
createBindableChannel("input", new BindingProperties()), properties);
Expand Down