Skip to content

Commit

Permalink
ordered consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Nov 12, 2021
1 parent 39eae74 commit 6aeeab4
Show file tree
Hide file tree
Showing 18 changed files with 690 additions and 110 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/PullSubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

/**
* The PullSubscribeOptions class specifies the options for subscribing with JetStream enabled servers.
* Options are set using the {@link Builder} or static helper methods.
* Options are set using the {@link PullSubscribeOptions.Builder} or static helper methods.
*/
public class PullSubscribeOptions extends SubscribeOptions {

private PullSubscribeOptions(Builder builder) {
super(builder, true, null, null);
super(builder, true, false, null, null);
}

/**
Expand Down
19 changes: 15 additions & 4 deletions src/main/java/io/nats/client/PushSubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

/**
* The PushSubscribeOptions class specifies the options for subscribing with JetStream enabled servers.
* Options are set using the {@link PullSubscribeOptions.Builder} or static helper methods.
* Options are set using the {@link PushSubscribeOptions.Builder} or static helper methods.
*/
public class PushSubscribeOptions extends SubscribeOptions {

private PushSubscribeOptions(Builder builder, String deliverSubject, String deliverGroup) {
super(builder, false, deliverSubject, deliverGroup);
private PushSubscribeOptions(Builder builder, boolean ordered, String deliverSubject, String deliverGroup) {
super(builder, false, ordered, deliverSubject, deliverGroup);
}

/**
Expand Down Expand Up @@ -92,6 +92,7 @@ public static Builder builder() {
*/
public static class Builder
extends SubscribeOptions.Builder<Builder, PushSubscribeOptions> {
private boolean ordered;
private String deliverSubject;
private String deliverGroup;

Expand All @@ -100,6 +101,16 @@ protected Builder getThis() {
return this;
}

/**
* Set the ordered consumer flag
* @param ordered flag indicating whether this subscription should be ordered
* @return the builder.
*/
public Builder ordered(boolean ordered) {
this.ordered = ordered;
return this;
}

/**
* Setting this specifies the push model to a delivery subject.
* Null or empty clears the field.
Expand Down Expand Up @@ -128,7 +139,7 @@ public Builder deliverGroup(String deliverGroup) {
*/
@Override
public PushSubscribeOptions build() {
return new PushSubscribeOptions(this, deliverSubject, deliverGroup);
return new PushSubscribeOptions(this, ordered, deliverSubject, deliverGroup);
}
}
}
Expand Down
69 changes: 60 additions & 9 deletions src/main/java/io/nats/client/SubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@

package io.nats.client;

import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;

import java.time.Duration;

import static io.nats.client.support.NatsJetStreamClientError.*;
import static io.nats.client.support.Validator.*;

Expand All @@ -23,33 +26,68 @@
*/
public abstract class SubscribeOptions {

private static final long DEFAULT_ORDERED_HEARTBEAT = 5000;

protected final String stream;
protected final boolean pull;
protected final boolean bind;
protected final ConsumerConfiguration consumerConfig;
protected final boolean ordered;
protected final long messageAlarmTime;
protected final ConsumerConfiguration consumerConfig;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get it's vars
protected SubscribeOptions(Builder builder, boolean pull, String deliverSubject, String deliverGroup) {
protected SubscribeOptions(Builder builder, boolean isPull, boolean isOrdered, String deliverSubject, String deliverGroup) {

this.stream = validateStreamName(builder.stream, builder.bind); // required when bind mode
pull = isPull;
bind = builder.bind;
ordered = isOrdered;
messageAlarmTime = builder.messageAlarmTime;

if (ordered) {
if (pull) { throw JsSoOrderedNotAllowedWithPull.instance(); }
if (bind) { throw JsSoOrderedNotAllowedWithBind.instance(); }
}

stream = validateStreamName(builder.stream, bind); // required when bind mode

String durable = validateMustMatchIfBothSupplied(builder.durable, builder.cc == null ? null : builder.cc.getDurable(), JsSoDurableMismatch);
durable = validateDurable(durable, pull || builder.bind); // required when pull or bind
durable = validateDurable(durable, pull || bind); // required when pull or bind

deliverGroup = validateMustMatchIfBothSupplied(deliverGroup, builder.cc == null ? null : builder.cc.getDeliverGroup(), JsSoDeliverGroupMismatch);

deliverSubject = validateMustMatchIfBothSupplied(deliverSubject, builder.cc == null ? null : builder.cc.getDeliverSubject(), JsSoDeliverSubjectGroupMismatch);

this.consumerConfig = ConsumerConfiguration.builder(builder.cc)
if (isOrdered) {
validateNotSupplied(deliverGroup, JsSoOrderedNotAllowedWithDeliverGroup);
validateNotSupplied(durable, JsSoOrderedNotAllowedWithDurable);
validateNotSupplied(deliverSubject, JsSoOrderedNotAllowedWithDeliverSubject);
long hb = DEFAULT_ORDERED_HEARTBEAT;
if (builder.cc != null) {
if (builder.cc.ackPolicyWasSet() && builder.cc.getAckPolicy() != AckPolicy.None) {
throw JsSoOrderedRequiresAckPolicyNone.instance();
}
if (builder.cc.getMaxDeliver() > 1) {
throw JsSoOrderedRequiresMaxDeliver1.instance();
}
Duration ccHb = builder.cc.getIdleHeartbeat();
if (ccHb != null && ccHb.toMillis() > hb) {
hb = ccHb.toMillis();
}
}
consumerConfig = ConsumerConfiguration.builder(builder.cc)
.ackPolicy(AckPolicy.None)
.maxDeliver(1)
.flowControl(hb)
.ackWait(Duration.ofHours(22))
.build();
}
else {
consumerConfig = ConsumerConfiguration.builder(builder.cc)
.durable(durable)
.deliverSubject(deliverSubject)
.deliverGroup(deliverGroup)
.build();

this.pull = pull;
this.bind = builder.bind;
this.messageAlarmTime = builder.messageAlarmTime;
}
}

/**
Expand Down Expand Up @@ -84,6 +122,19 @@ public boolean isBind() {
return bind;
}

/**
* Gets whether this subscription is expected to ensure messages come in order
* @return the ordered flag
*/
public boolean isOrdered() {
return ordered;
}

/**
* Get the time amount of time allowed to elapse without a heartbeat.
* If not set will default to 3 times the idle heartbeat setting
* @return the message alarm time
*/
public long getMessageAlarmTime() {
return messageAlarmTime;
}
Expand Down
84 changes: 80 additions & 4 deletions src/main/java/io/nats/client/api/ConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,86 @@ public boolean getHeadersOnly() {
return headersOnly != null && headersOnly;
}

/**
* Gets whether deliver policy of this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
*/
public boolean deliverPolicyWasSet() {
return deliverPolicy != null;
}

/**
* Gets whether ack policy for this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
*/
public boolean ackPolicyWasSet() {
return ackPolicy != null;
}

/**
* Gets whether replay policy for this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
*/
public boolean replayPolicyWasSet() {
return replayPolicy != null;
}

/**
* Gets whether start sequence for this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
*/
public boolean startSeqWasSet() {
return startSeq != null;
}

/**
* Gets whether max deliver for this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
*/
public boolean maxDeliverWasSet() {
return maxDeliver != null;
}

/**
* Gets whether rate limit for this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
*/
public boolean rateLimitWasSet() {
return rateLimit != null;
}

/**
* Gets whether max ack pending for this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
*/
public boolean maxAckPendingWasSet() {
return maxAckPending != null;
}

/**
* Gets whether max pull waiting for this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
*/
public boolean maxPullWaitingWasSet() {
return maxPullWaiting != null;
}

/**
* Gets whether flow control for this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
*/
public boolean flowControlWasSet() {
return flowControl != null;
}

/**
* Gets whether headers only for this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
*/
public boolean headersOnlyWasSet() {
return headersOnly != null;
}

public boolean wouldBeChangeTo(ConsumerConfiguration original) {
return (deliverPolicy != null && deliverPolicy != original.deliverPolicy)
|| (ackPolicy != null && ackPolicy != original.ackPolicy)
Expand Down Expand Up @@ -753,10 +833,6 @@ long valueOrInitial(Long val) {
return val == null ? initial : val;
}

long initial(long val) {
return val < min ? initial : val;
}

public long comparable(Long val) {
return val == null || val < min || val == server ? initial : val;
}
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/io/nats/client/impl/MessageManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.impl;

import io.nats.client.Message;

public interface MessageManager {
boolean manage(Message msg);
}
10 changes: 1 addition & 9 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -99,8 +98,6 @@ class NatsConnection implements Connection {

private String currentServer = null;

private Function<NatsMessage, NatsMessage> beforeQueueProcessor;

NatsConnection(Options options) {
boolean trace = options.isTraceConnection();
timeTrace(trace, "creating connection object");
Expand Down Expand Up @@ -147,14 +144,9 @@ class NatsConnection implements Connection {

this.needPing = new AtomicBoolean(true);

beforeQueueProcessor = msg -> msg; // default just returns the message
timeTrace(trace, "connection object created");
}

void setBeforeQueueProcessor(Function<NatsMessage, NatsMessage> beforeQueueProcessor) {
this.beforeQueueProcessor = beforeQueueProcessor;
}

// Connect is only called after creation
void connect(boolean reconnectOnConnect) throws InterruptedException, IOException {
if (options.getServers().size() == 0) {
Expand Down Expand Up @@ -1379,7 +1371,7 @@ void deliverMessage(NatsMessage msg) {
// does not need to be queued, for instance heartbeats
// that are not flow control and are already seen by the
// auto status manager
msg = beforeQueueProcessor.apply(msg);
msg = sub.getBeforeQueueProcessor().apply(msg);
if (msg != null) {
q.push(msg);
}
Expand Down

0 comments on commit 6aeeab4

Please sign in to comment.