Skip to content

Commit

Permalink
BeforeQueueProcessor must be per subscription, not per connection
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Nov 12, 2021
1 parent 39eae74 commit dd22499
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
10 changes: 1 addition & 9 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -99,8 +98,6 @@ class NatsConnection implements Connection {

private String currentServer = null;

private Function<NatsMessage, NatsMessage> beforeQueueProcessor;

NatsConnection(Options options) {
boolean trace = options.isTraceConnection();
timeTrace(trace, "creating connection object");
Expand Down Expand Up @@ -147,14 +144,9 @@ class NatsConnection implements Connection {

this.needPing = new AtomicBoolean(true);

beforeQueueProcessor = msg -> msg; // default just returns the message
timeTrace(trace, "connection object created");
}

void setBeforeQueueProcessor(Function<NatsMessage, NatsMessage> beforeQueueProcessor) {
this.beforeQueueProcessor = beforeQueueProcessor;
}

// Connect is only called after creation
void connect(boolean reconnectOnConnect) throws InterruptedException, IOException {
if (options.getServers().size() == 0) {
Expand Down Expand Up @@ -1379,7 +1371,7 @@ void deliverMessage(NatsMessage msg) {
// does not need to be queued, for instance heartbeats
// that are not flow control and are already seen by the
// auto status manager
msg = beforeQueueProcessor.apply(msg);
msg = sub.getBeforeQueueProcessor().apply(msg);
if (msg != null) {
q.push(msg);
}
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/nats/client/impl/NatsSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

class NatsSubscription extends NatsConsumer implements Subscription {

Expand All @@ -31,6 +32,8 @@ class NatsSubscription extends NatsConsumer implements Subscription {

private AtomicLong unSubMessageLimit;

private Function<NatsMessage, NatsMessage> beforeQueueProcessor;

NatsSubscription(String sid, String subject, String queueName, NatsConnection connection,
NatsDispatcher dispatcher) {
super(connection);
Expand All @@ -43,12 +46,22 @@ class NatsSubscription extends NatsConsumer implements Subscription {
if (this.dispatcher == null) {
this.incoming = new MessageQueue(false);
}

beforeQueueProcessor = m -> m;
}

public boolean isActive() {
return (this.dispatcher != null || this.incoming != null);
}

void setBeforeQueueProcessor(Function<NatsMessage, NatsMessage> beforeQueueProcessor) {
this.beforeQueueProcessor = beforeQueueProcessor == null ? m -> m : beforeQueueProcessor;
}

public Function<NatsMessage, NatsMessage> getBeforeQueueProcessor() {
return beforeQueueProcessor;
}

void invalidate() {
if (this.incoming != null) {
this.incoming.pause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class PushAutoStatusManager implements AutoStatusManager {
public void setSub(NatsJetStreamSubscription sub) {
this.sub = sub;
if (hb) {
conn.setBeforeQueueProcessor(this::beforeQueueProcessor);
sub.setBeforeQueueProcessor(this::beforeQueueProcessor);
asmTimer = new AsmTimer();
}
}
Expand Down

0 comments on commit dd22499

Please sign in to comment.