Skip to content

Commit

Permalink
Implement new request style
Browse files Browse the repository at this point in the history
More protocol-friendly request pattern. Addresses issue #117.
  • Loading branch information
Tyler Treat committed Jun 2, 2017
1 parent 7f927ed commit 8c6ed7d
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 21 deletions.
22 changes: 20 additions & 2 deletions src/it/java/io/nats/client/ITBasicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,17 @@ public void testDoubleUnsubscribe() throws Exception {

@Test
public void testRequestTimeout() throws Exception {
testRequestTimeout(false);
}

@Test
public void testOldRequestTimeout() throws Exception {
testRequestTimeout(true);
}

private void testRequestTimeout(boolean useOldStyle) throws Exception {
try (NatsServer srv = runDefaultServer()) {
try (Connection c = newDefaultConnection()) {
try (Connection c = useOldStyle ? new Options.Builder().useOldRequestStyle(true).build().connect() : newDefaultConnection()) {
assertFalse(c.isClosed());
assertNull("should time out", c.request("foo", "help".getBytes(), 10));
}
Expand All @@ -440,9 +449,18 @@ public void testRequestTimeout() throws Exception {

@Test
public void testRequest() throws Exception {
testRequest(false);
}

@Test
public void testOldRequest() throws Exception {
testRequest(true);
}

private void testRequest(boolean useOldStyle) throws Exception {
final byte[] response = "I will help you.".getBytes();
try (NatsServer srv = runDefaultServer()) {
try (final Connection c = newDefaultConnection()) {
try (Connection c = useOldStyle ? new Options.Builder().useOldRequestStyle(true).build().connect() : newDefaultConnection()) {
sleep(100);
try (AsyncSubscription s = c.subscribe("foo", new MessageHandler() {
public void onMessage(Message msg) {
Expand Down
107 changes: 100 additions & 7 deletions src/main/java/io/nats/client/ConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
Expand All @@ -52,6 +53,7 @@
import java.util.Properties;
import java.util.Random;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -66,6 +68,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -75,6 +78,8 @@ class ConnectionImpl implements Connection {
private String version = null;

private static final String INBOX_PREFIX = "_INBOX.";
private static final int NUID_SIZE = 22;
private static final int RESP_INBOX_PREFIX_LEN = INBOX_PREFIX.length() + NUID_SIZE + 1;

private ConnState status = DISCONNECTED;

Expand All @@ -95,6 +100,11 @@ class ConnectionImpl implements Connection {
private long flushTimerInterval = 1;
private TimeUnit flushTimerUnit = TimeUnit.MILLISECONDS;

// New style response handler
private String respSub; // The wildcard subject
private Subscription respMux; // A single response subscription
private ConcurrentHashMap<String, BlockingQueue<Message>> respMap; // Request map for the response msg queues


protected static final String CRLF = "\r\n";
protected static final String _EMPTY_ = "";
Expand Down Expand Up @@ -429,7 +439,7 @@ Connection connect() throws IOException {
// Cancel out default connection refused, will trigger the
// No servers error conditional
if (e.getMessage() != null && e.getMessage().contains("Connection refused")) {
setLastError(null);
setLastError(null);
}
}
} // for
Expand Down Expand Up @@ -2059,15 +2069,32 @@ public void publish(Message msg) throws IOException {
@Override
public Message request(String subject, byte[] data, long timeout, TimeUnit unit)
throws IOException, InterruptedException {
String inbox = newInbox();
BlockingQueue<Message> ch = createMsgChannel(8);
if (opts.useOldRequestStyle) {
return oldRequest(subject, data, timeout, unit);
}

// Make sure scoped subscription is setup at least once on first call to request().
// Will handle duplicates in createRespMux.
createRespMux();

try (SyncSubscription sub = (SyncSubscription) subscribe(inbox, null, null, ch)) {
sub.autoUnsubscribe(1);
publish(subject, inbox, data);
return sub.nextMessage(timeout, unit);
// Create literal Inbox and map to a queue.
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(1);
String respInbox = newRespInbox();
String token = respToken(respInbox);
respMap.put(token, queue);

publish(subject, respInbox, data);
Message response;
if (timeout < 0) {
response = queue.take();
} else {
response = queue.poll(timeout, unit);
if (response == null) {
// Timed out, cleanup queue.
respMap.remove(token);
}
}
return response;
}

@Override
Expand All @@ -2081,6 +2108,72 @@ public Message request(String subject, byte[] data) throws IOException, Interrup
return request(subject, data, -1, TimeUnit.MILLISECONDS);
}

// Creates the response subscription we will use for all new style responses. This will be on an _INBOX with an
// additional terminal token. The subscription will be on a wildcard.
private synchronized void createRespMux() {
if (respMap != null) {
// Already setup for responses.
return;
}

// _INBOX wildcard
respSub = String.format("%s.*", newInbox());
respMux = subscribe(respSub, new RespHandler());
respMap = new ConcurrentHashMap<>();
}

// Creates a new literal response subject that will trigger the global subscription handler.
private String newRespInbox() {
byte[] b = new byte[RESP_INBOX_PREFIX_LEN + NUID_SIZE];
byte[] respSubBytes = respSub.getBytes();
System.arraycopy(respSubBytes, 0, b, 0, RESP_INBOX_PREFIX_LEN);
byte[] nuid = NUID.nextGlobal().getBytes();
System.arraycopy(nuid, 0, b, RESP_INBOX_PREFIX_LEN, nuid.length);
return new String(b);
}

private String respToken(String respInbox) {
return respInbox.substring(RESP_INBOX_PREFIX_LEN);
}

/**
* RespHandler is the global response handler. It will look up the appropriate queue based on the last token and
* place the message on the queue if possible.
*/
private final class RespHandler implements MessageHandler {
@Override
public void onMessage(Message msg) {
String token = respToken(msg.getSubject());

// Just return if closed.
if (isClosed()) {
return;
}

BlockingQueue<Message> queue = respMap.get(token);
if (queue == null) {
// No response queue, drop the message.
return;
}

// Delete the key regardless, one response only.
respMap.remove(token);
queue.offer(msg);
}
}

private Message oldRequest(String subject, byte[] data, long timeout, TimeUnit unit)
throws IOException, InterruptedException {
String inbox = newInbox();
BlockingQueue<Message> ch = createMsgChannel(8);

try (SyncSubscription sub = (SyncSubscription) subscribe(inbox, null, null, ch)) {
sub.autoUnsubscribe(1);
publish(subject, inbox, data);
return sub.nextMessage(timeout, unit);
}
}

@Override
public String newInbox() {
return String.format("%s%s", INBOX_PREFIX, NUID.nextGlobal());
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/nats/client/Nats.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ public enum ConnState {
* This property is defined as String {@value #PROP_URL}.
*/
public static final String PROP_URL = PFX + "url";
/**
* This property is defined as String {@value #PROP_USE_OLD_REQUEST_STYLE}.
*/
public static final String PROP_USE_OLD_REQUEST_STYLE = "use.old.request.style";

/*
* Constants
Expand Down
31 changes: 24 additions & 7 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static io.nats.client.Nats.PROP_TLS_DEBUG;
import static io.nats.client.Nats.PROP_URL;
import static io.nats.client.Nats.PROP_USERNAME;
import static io.nats.client.Nats.PROP_USE_OLD_REQUEST_STYLE;
import static io.nats.client.Nats.PROP_VERBOSE;

import java.io.IOException;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class Options {
final int connectionTimeout;
final long pingInterval;
final int maxPingsOut;
final boolean useOldRequestStyle;
// Connection handlers
public ClosedCallback closedCb;
public DisconnectedCallback disconnectedCb;
Expand Down Expand Up @@ -122,6 +124,7 @@ private Options(Builder builder) {
this.connectionTimeout = builder.connectionTimeout;
this.pingInterval = builder.pingInterval;
this.maxPingsOut = builder.maxPingsOut;
this.useOldRequestStyle = builder.useOldRequestStyle;
this.sslContext = builder.sslContext;
this.tlsDebug = builder.tlsDebug;
this.disconnectedCb = builder.disconnectedCb;
Expand Down Expand Up @@ -160,6 +163,7 @@ && compare(connectionName, other.connectionName)
&& Integer.compare(connectionTimeout, other.connectionTimeout) == 0
&& Long.compare(pingInterval, other.pingInterval) == 0
&& Integer.compare(maxPingsOut, other.maxPingsOut) == 0
&& Boolean.compare(useOldRequestStyle, other.useOldRequestStyle) == 0
&& (sslContext == null ? other.sslContext == null : sslContext.equals(other
.sslContext))
&& Boolean.compare(tlsDebug, other.tlsDebug) == 0
Expand All @@ -177,7 +181,7 @@ && compare(connectionName, other.connectionName)
public int hashCode() {
return Objects.hash(url, username, password, token, servers, noRandomize, connectionName,
verbose, pedantic, secure, allowReconnect, maxReconnect, reconnectBufSize,
reconnectWait, connectionTimeout, pingInterval, maxPingsOut, sslContext, tlsDebug,
reconnectWait, connectionTimeout, pingInterval, maxPingsOut, useOldRequestStyle, sslContext, tlsDebug,
factory, disconnectedCb, closedCb, reconnectedCb, asyncErrorCb);
}

Expand Down Expand Up @@ -288,6 +292,8 @@ public int getMaxPingsOut() {
return maxPingsOut;
}

public boolean isUseOldRequestStyle() { return useOldRequestStyle; }

public ExceptionHandler getExceptionHandler() {
return asyncErrorCb;
}
Expand Down Expand Up @@ -342,12 +348,13 @@ public static final class Builder {
private boolean pedantic;
private boolean secure;
private boolean allowReconnect = true;
private int maxReconnect = Nats.DEFAULT_MAX_RECONNECT;
private int reconnectBufSize = Nats.DEFAULT_RECONNECT_BUF_SIZE;
private long reconnectWait = Nats.DEFAULT_RECONNECT_WAIT;
private int connectionTimeout = Nats.DEFAULT_TIMEOUT;
private long pingInterval = Nats.DEFAULT_PING_INTERVAL;
private int maxPingsOut = Nats.DEFAULT_MAX_PINGS_OUT;
private int maxReconnect = DEFAULT_MAX_RECONNECT;
private int reconnectBufSize = DEFAULT_RECONNECT_BUF_SIZE;
private long reconnectWait = DEFAULT_RECONNECT_WAIT;
private int connectionTimeout = DEFAULT_TIMEOUT;
private long pingInterval = DEFAULT_PING_INTERVAL;
private int maxPingsOut = DEFAULT_MAX_PINGS_OUT;
private boolean useOldRequestStyle;
private SSLContext sslContext;
private boolean tlsDebug;
private TcpConnectionFactory factory;
Expand Down Expand Up @@ -388,6 +395,7 @@ public Builder(Options template) {
this.reconnectedCb = template.reconnectedCb;
this.asyncErrorCb = template.asyncErrorCb;
this.factory = template.factory;
this.useOldRequestStyle = template.useOldRequestStyle;
}

public Builder() {
Expand Down Expand Up @@ -485,6 +493,10 @@ public Builder(Properties props) {
this.maxPingsOut = Integer.parseInt(
props.getProperty(PROP_MAX_PINGS, Integer.toString(DEFAULT_MAX_PINGS_OUT)));
}
// PROP_USE_OLD_REQUEST_STYLE
if (props.containsKey(PROP_USE_OLD_REQUEST_STYLE)) {
this.useOldRequestStyle = Boolean.parseBoolean(props.getProperty(PROP_USE_OLD_REQUEST_STYLE));
}
// PROP_EXCEPTION_HANDLER
if (props.containsKey(PROP_EXCEPTION_HANDLER)) {
Object instance;
Expand Down Expand Up @@ -554,6 +566,11 @@ public Builder maxPingsOut(int maxPingsOut) {
return this;
}

public Builder useOldRequestStyle(boolean useOldRequestStyle) {
this.useOldRequestStyle = useOldRequestStyle;
return this;
}

public Builder maxReconnect(int maxReconnect) {
this.maxReconnect = maxReconnect;
return this;
Expand Down
11 changes: 6 additions & 5 deletions src/test/java/io/nats/client/ConnectionImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1759,14 +1759,14 @@ public void testGetServerInfo() throws Exception {
}

@Test
public void testRequest() throws Exception {
public void testOldRequest() throws Exception {
final String inbox = "_INBOX.DEADBEEF";
// SyncSubscriptionImpl mockSub = mock(SyncSubscriptionImpl.class);
Message replyMsg = new Message();
replyMsg.setData("answer".getBytes());
replyMsg.setSubject(inbox);
when(syncSubMock.nextMessage(any(long.class), any(TimeUnit.class))).thenReturn(replyMsg);
try (ConnectionImpl c = (ConnectionImpl) spy(newMockedConnection())) {
Options opts = new Options.Builder().useOldRequestStyle(true).build();
try (ConnectionImpl c = (ConnectionImpl) spy(newMockedConnection(opts))) {
doReturn(inbox).when(c).newInbox();
doReturn(mchMock).when(c).createMsgChannel(anyInt());
doReturn(syncSubMock).when(c).subscribe(inbox, null, null,
Expand All @@ -1784,11 +1784,12 @@ public void testRequest() throws Exception {
}

@Test
public void testRequestErrors() throws Exception {
public void testOldRequestErrors() throws Exception {
final String errMsg = "testRequestErrors()";
thrown.expect(IOException.class);
thrown.expectMessage(errMsg);
try (ConnectionImpl nc = (ConnectionImpl) spy(newMockedConnection())) {
Options opts = new Options.Builder().useOldRequestStyle(true).build();
try (ConnectionImpl nc = (ConnectionImpl) spy(newMockedConnection(opts))) {
when(nc.subscribeSync("foo", null)).thenReturn(syncSubMock);
doThrow(new IOException(errMsg)).when(nc).publish(anyString(), anyString(),
any(byte[].class));
Expand Down

0 comments on commit 8c6ed7d

Please sign in to comment.