Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Part 2 in pull handling improvements #871

Merged
merged 7 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 60 additions & 84 deletions src/main/java/io/nats/client/impl/MessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,49 @@
import java.time.Duration;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;

abstract class MessageManager {
protected static final int THRESHOLD = 3;

protected final Object stateChangeLock;
protected final NatsConnection conn;
protected final boolean syncMode;

protected NatsJetStreamSubscription sub;
protected NatsJetStreamSubscription sub; // not final it is not set until after construction

protected long lastStreamSeq;
protected long internalConsumerSeq;

protected final AtomicLong lastMsgReceived;
protected long lastConsumerSeq;
protected long lastMsgReceived;

// heartbeat stuff
protected boolean hb;
protected long idleHeartbeatSetting;
protected long alarmPeriodSetting;
protected HeartbeatTimer heartbeatTimer;
protected TimerTask heartbeatTimerTask;
protected Timer heartbeatTimer;

protected MessageManager(NatsConnection conn, boolean syncMode) {
stateChangeLock = new Object();

this.conn = conn;
this.syncMode = syncMode;
lastStreamSeq = 0;
internalConsumerSeq = 0;
lastMsgReceived = new AtomicLong(System.currentTimeMillis());
lastConsumerSeq = 0;

hb = false;
idleHeartbeatSetting = 0;
alarmPeriodSetting = 0;

lastMsgReceived = System.currentTimeMillis();
}

protected boolean isSyncMode() { return syncMode; }
protected long getLastStreamSequence() { return lastStreamSeq; }
protected long getInternalConsumerSequence() { return internalConsumerSeq; }
protected long getLastMsgReceived() { return lastMsgReceived.get(); }
protected boolean isHb() { return hb; }
protected long getIdleHeartbeatSetting() { return idleHeartbeatSetting; }
protected long getAlarmPeriodSetting() { return alarmPeriodSetting; }
protected boolean isSyncMode() { return syncMode; }
protected long getLastStreamSequence() { return lastStreamSeq; }
protected long getLastConsumerSequence() { return lastConsumerSeq; }
protected long getLastMsgReceived() { return lastMsgReceived; }
protected boolean isHb() { return hb; }
protected long getIdleHeartbeatSetting() { return idleHeartbeatSetting; }
protected long getAlarmPeriodSetting() { return alarmPeriodSetting; }

protected void startup(NatsJetStreamSubscription sub) {
this.sub = sub;
Expand All @@ -73,7 +76,9 @@ protected void startPullRequest(PullRequestOptions pullRequestOptions) {
}

protected void messageReceived() {
lastMsgReceived.set(System.currentTimeMillis());
synchronized (stateChangeLock) {
lastMsgReceived = System.currentTimeMillis();
}
}

protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
Expand All @@ -83,87 +88,58 @@ protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
abstract protected boolean manage(Message msg);

protected void trackJsMessage(Message msg) {
NatsJetStreamMetaData meta = msg.metaData();
lastStreamSeq = meta.streamSequence();
internalConsumerSeq++;
synchronized (stateChangeLock) {
NatsJetStreamMetaData meta = msg.metaData();
lastStreamSeq = meta.streamSequence();
lastConsumerSeq++;
}
}

protected void initIdleHeartbeat(Duration configIdleHeartbeat, long configMessageAlarmTime) {
idleHeartbeatSetting = configIdleHeartbeat == null ? 0 : configIdleHeartbeat.toMillis();
if (idleHeartbeatSetting <= 0) {
alarmPeriodSetting = 0;
hb = false;
}
else {
if (configMessageAlarmTime < idleHeartbeatSetting) {
alarmPeriodSetting = idleHeartbeatSetting * THRESHOLD;
protected void handleHeartbeatError() {
conn.executeCallback((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq));
}

protected void configureIdleHeartbeat(Duration configIdleHeartbeat, long configMessageAlarmTime) {
synchronized (stateChangeLock) {
idleHeartbeatSetting = configIdleHeartbeat == null ? 0 : configIdleHeartbeat.toMillis();
if (idleHeartbeatSetting <= 0) {
alarmPeriodSetting = 0;
hb = false;
}
else {
alarmPeriodSetting = configMessageAlarmTime;
if (configMessageAlarmTime < idleHeartbeatSetting) {
alarmPeriodSetting = idleHeartbeatSetting * THRESHOLD;
}
else {
alarmPeriodSetting = configMessageAlarmTime;
}
hb = true;
}
hb = true;
}
}

protected void initOrResetHeartbeatTimer() {
if (heartbeatTimer == null) {
heartbeatTimer = new HeartbeatTimer(alarmPeriodSetting);
}
else {
heartbeatTimer.restart();
synchronized (stateChangeLock) {
shutdownHeartbeatTimer();
heartbeatTimer = new Timer();
heartbeatTimerTask = new TimerTask() {
@Override
public void run() {
long sinceLast = System.currentTimeMillis() - lastMsgReceived;
if (sinceLast > alarmPeriodSetting) {
handleHeartbeatError();
}
}
};
heartbeatTimer.schedule(heartbeatTimerTask, alarmPeriodSetting, alarmPeriodSetting);
}
}

protected void shutdownHeartbeatTimer() {
if (heartbeatTimer != null) {
heartbeatTimer.shutdown();
heartbeatTimer = null;
}
}

protected void handleHeartbeatError() {
conn.executeCallback((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, internalConsumerSeq));
}

protected class HeartbeatTimer {
protected Timer timer;
protected boolean alive = true;
protected long alarmPeriodSetting;

protected class HeartbeatTimerTask extends TimerTask {
@Override
public void run() {
long sinceLast = System.currentTimeMillis() - lastMsgReceived.get();
if (sinceLast > alarmPeriodSetting) {
handleHeartbeatError();
}
restart();
}
}

protected HeartbeatTimer(long alarmPeriodSetting) {
this.alarmPeriodSetting = alarmPeriodSetting;
restart();
}

synchronized protected void restart() {
cancel();
if (alive) {
timer = new Timer();
timer.schedule(new HeartbeatTimerTask(), alarmPeriodSetting);
}
}

synchronized protected void shutdown() {
alive = false;
cancel();
}

private void cancel() {
if (timer != null) {
timer.cancel();
timer.purge();
timer = null;
synchronized (stateChangeLock) {
if (heartbeatTimer != null) {
heartbeatTimer.cancel();
heartbeatTimer = null;
}
}
}
Expand Down
28 changes: 13 additions & 15 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ MessageManager createMessageManager(
SubscribeOptions so, ConsumerConfiguration cc, boolean queueMode, boolean syncMode);
}

MessageManagerFactory _pushStandardMessageManagerFactory = PushMessageManager::new;
MessageManagerFactory _pushMessageManagerFactory = PushMessageManager::new;
MessageManagerFactory _pushOrderedMessageManagerFactory = OrderedMessageManager::new;
MessageManagerFactory _pullMessageManagerFactory =
(mmConn, mmJs, mmStream, mmSo, mmCc, mmQueueMode, mmSyncMode) -> new PullMessageManager(mmConn, mmSyncMode);
Expand Down Expand Up @@ -397,7 +397,6 @@ else if (so.isBind()) {
}

// 6. create the subscription. lambda needs final or effectively final vars
NatsJetStreamSubscription sub;
final MessageManager mm;
final NatsSubscriptionFactory subFactory;
if (isPullMode) {
Expand All @@ -406,7 +405,7 @@ else if (so.isBind()) {
-> new NatsJetStreamPullSubscription(sid, lSubject, lConn, this, fnlStream, settledConsumerName, mm);
}
else {
MessageManagerFactory mmFactory = so.isOrdered() ? _pushOrderedMessageManagerFactory : _pushStandardMessageManagerFactory;
MessageManagerFactory mmFactory = so.isOrdered() ? _pushOrderedMessageManagerFactory : _pushMessageManagerFactory;
mm = mmFactory.createMessageManager(conn, this, fnlStream, so, settledServerCC, false, dispatcher == null);
subFactory = (sid, lSubject, lQgroup, lConn, lDispatcher) -> {
NatsJetStreamSubscription nsub = new NatsJetStreamSubscription(sid, lSubject, lQgroup, lConn, lDispatcher,
Expand All @@ -417,11 +416,12 @@ else if (so.isBind()) {
return nsub;
};
}
NatsJetStreamSubscription sub;
if (dispatcher == null) {
sub = (NatsJetStreamSubscription) conn.createSubscription(fnlInboxDeliver, qgroup, null, subFactory);
}
else {
AsyncMessageHandler handler = new AsyncMessageHandler(userHandler, isAutoAck, settledServerCC, mm);
AsyncMessageHandler handler = new AsyncMessageHandler(mm, userHandler, isAutoAck, settledServerCC);
sub = (NatsJetStreamSubscription) dispatcher.subscribeImplJetStream(fnlInboxDeliver, qgroup, handler, subFactory);
}

Expand Down Expand Up @@ -484,26 +484,24 @@ public List<String> getChanges(ConsumerConfiguration serverCc) {

static class AsyncMessageHandler implements MessageHandler {
MessageManager manager;
List<MessageHandler> handlers;

public AsyncMessageHandler(MessageHandler userHandler, boolean isAutoAck, ConsumerConfiguration settledServerCC, MessageManager manager) {
handlers = new ArrayList<>();
handlers.add(userHandler);
if (isAutoAck && settledServerCC.getAckPolicy() != AckPolicy.None) {
handlers.add(Message::ack);
};
MessageHandler userHandler;
boolean autoAck;

public AsyncMessageHandler(MessageManager manager, MessageHandler userHandler, boolean isAutoAck, ConsumerConfiguration settledServerCC) {
this.manager = manager;
this.userHandler = userHandler;
autoAck = isAutoAck && settledServerCC.getAckPolicy() != AckPolicy.None;
}

@Override
public void onMessage(Message msg) throws InterruptedException {
if (manager.manage(msg)) {
return;
return; // manager handled the message
}

for (MessageHandler mh : handlers) {
mh.onMessage(msg);
userHandler.onMessage(msg);
if (autoAck) {
msg.ack();
}
}
}
Expand Down
18 changes: 8 additions & 10 deletions src/main/java/io/nats/client/impl/OrderedMessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,39 @@
import io.nats.client.SubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.support.Status;

import java.util.concurrent.atomic.AtomicReference;

class OrderedMessageManager extends PushMessageManager {

private long expectedExternalConsumerSeq;
private final AtomicReference<String> targetSid;
protected long expectedExternalConsumerSeq;
protected final AtomicReference<String> targetSid;

protected OrderedMessageManager(NatsConnection conn,
NatsJetStream js,
String stream,
SubscribeOptions so,
ConsumerConfiguration serverCC,
ConsumerConfiguration originalCc,
boolean queueMode,
boolean syncMode) {
super(conn, js, stream, so, serverCC, queueMode, syncMode);
super(conn, js, stream, so, originalCc, queueMode, syncMode);
expectedExternalConsumerSeq = 1; // always starts at 1
targetSid = new AtomicReference<>();
}

@Override
protected void startup(NatsJetStreamSubscription sub) {
targetSid.set(sub.getSID());
super.startup(sub);
targetSid.set(sub.getSID());
}

@Override
protected boolean manage(Message msg) {
if (!msg.getSID().equals(targetSid.get())) {
return true;
return true; // wrong sid is throwaway from previous consumer that errored
}

Status status = msg.getStatus();
if (status == null) {
if (msg.isJetStream()) {
long receivedConsumerSeq = msg.metaData().consumerSequence();
if (expectedExternalConsumerSeq != receivedConsumerSeq) {
handleErrorCondition();
Expand All @@ -63,7 +61,7 @@ protected boolean manage(Message msg) {
}

super.manageStatus(msg);
return true; // all status are managed
return true; // all statuses are managed
}

private void handleErrorCondition() {
Expand Down
Loading