Skip to content

Commit

Permalink
Merge 432c7fd into 28ae1cc
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Marston committed Feb 15, 2018
2 parents 28ae1cc + 432c7fd commit 7dfbf3a
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 46 deletions.
19 changes: 12 additions & 7 deletions src/it/java/io/nats/client/ITSubscriptionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -703,13 +703,18 @@ public void onMessage(Message msg) {
public void onMessage(Message msg) {
// System.err.println("Responder");
String responseInbox = c.newInbox();
c.subscribe(responseInbox, new MessageHandler() {
public void onMessage(Message msg) {
// System.err.println("Internal subscriber.");
sleep(100);
latch.countDown();
}
});
try {
c.subscribe(responseInbox, new MessageHandler() {
public void onMessage(Message msg) {
// System.err.println("Internal subscriber.");
sleep(100);
latch.countDown();
}
});
} catch(IOException e) {
e.printStackTrace();
}

// System.err.println("starter subscribed");
sleep(100);
try {
Expand Down
24 changes: 16 additions & 8 deletions src/main/java/io/nats/client/AbstractConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ public interface AbstractConnection extends AutoCloseable {
* @throws IllegalArgumentException if the subject name contains illegal characters.
* @throws NullPointerException if the subject name is null
* @throws IllegalStateException if the connection is closed
* @throws IOException
*/
SyncSubscription subscribe(String subject);
SyncSubscription subscribe(String subject) throws IOException;

/**
* Creates a {@link SyncSubscription} with interest in a given subject. All subscribers with the
Expand All @@ -37,8 +38,9 @@ public interface AbstractConnection extends AutoCloseable {
* @throws IllegalArgumentException if the subject (or queue) name contains illegal characters.
* @throws NullPointerException if the subject name is null
* @throws IllegalStateException if the connection is closed
* @throws IOException
*/
SyncSubscription subscribe(String subject, String queue);
SyncSubscription subscribe(String subject, String queue) throws IOException;

/**
* Creates a {@code AsyncSubscription} with interest in a given subject, assign the callback,
Expand All @@ -51,8 +53,9 @@ public interface AbstractConnection extends AutoCloseable {
* @throws IllegalArgumentException if the subject (or queue) name contains illegal characters.
* @throws NullPointerException if the subject name is null
* @throws IllegalStateException if the connection is closed
* @throws IOException
*/
AsyncSubscription subscribe(String subject, MessageHandler cb);
AsyncSubscription subscribe(String subject, MessageHandler cb) throws IOException;

/**
* Creates an asynchronous queue subscriber on a given subject of interest. All subscribers with
Expand All @@ -64,8 +67,9 @@ public interface AbstractConnection extends AutoCloseable {
* @param cb a {@code MessageHandler} object used to process messages received by the
* {@code Subscription}
* @return {@code Subscription}
* @throws IOException
*/
AsyncSubscription subscribe(String subject, String queue, MessageHandler cb);
AsyncSubscription subscribe(String subject, String queue, MessageHandler cb) throws IOException;

/**
* Creates a {@code AsyncSubscription} with interest in a given subject, assign the callback,
Expand All @@ -78,9 +82,10 @@ public interface AbstractConnection extends AutoCloseable {
* @throws IllegalArgumentException if the subject (or queue) name contains illegal characters.
* @throws NullPointerException if the subject name is null
* @throws IllegalStateException if the connection is closed
* @throws IOException
* @deprecated As of release 0.6, use {@link #subscribe(String, MessageHandler)} instead
*/
AsyncSubscription subscribeAsync(String subject, MessageHandler cb);
AsyncSubscription subscribeAsync(String subject, MessageHandler cb) throws IOException;

/**
* Create an {@code AsyncSubscription} with interest in a given subject, assign the message
Expand All @@ -93,9 +98,10 @@ public interface AbstractConnection extends AutoCloseable {
* @throws IllegalArgumentException if the subject (or queue) name contains illegal characters.
* @throws NullPointerException if the subject name is null
* @throws IllegalStateException if the connection is closed
* @throws IOException
* @deprecated As of release 0.6, use {@link #subscribe(String, String, MessageHandler)} instead
*/
AsyncSubscription subscribeAsync(String subject, String queue, MessageHandler cb);
AsyncSubscription subscribeAsync(String subject, String queue, MessageHandler cb) throws IOException;

/**
* Creates a synchronous queue subscriber on a given subject of interest. All subscribers with
Expand All @@ -109,8 +115,9 @@ public interface AbstractConnection extends AutoCloseable {
* @throws IllegalArgumentException if the subject (or queue) name contains illegal characters.
* @throws NullPointerException if the subject name is null
* @throws IllegalStateException if the connection is closed
* @throws IOException
*/
SyncSubscription subscribeSync(String subject, String queue);
SyncSubscription subscribeSync(String subject, String queue) throws IOException;

/**
* Creates a {@link SyncSubscription} with interest in a given subject. In order to receive
Expand All @@ -121,8 +128,9 @@ public interface AbstractConnection extends AutoCloseable {
* @throws IllegalArgumentException if the subject name contains illegal characters.
* @throws NullPointerException if the subject name is null
* @throws IllegalStateException if the connection is closed
* @throws IOException
*/
SyncSubscription subscribeSync(String subject);
SyncSubscription subscribeSync(String subject) throws IOException;

/**
* Creates a new, uniquely named inbox with the prefix '_INBOX.'
Expand Down
59 changes: 28 additions & 31 deletions src/main/java/io/nats/client/ConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -910,18 +910,14 @@ boolean closed() {

// flushReconnectPending will push the pending items that were
// gathered while we were in a RECONNECTING state to the socket.
void flushReconnectPendingItems() {
void flushReconnectPendingItems() throws IOException {
if (pending == null) {
return;
}

if (pending.size() > 0) {
try {
bw.write(pending.toByteArray(), 0, pending.size());
bw.flush();
} catch (IOException e) {
// NOOP
}
bw.write(pending.toByteArray(), 0, pending.size());
bw.flush();
}

pending = null;
Expand Down Expand Up @@ -1020,14 +1016,14 @@ public void run() {
// cur.didConnect = true;
cur.reconnects = 0;

// Send existing subscription state
resendSubscriptions();
try {
// Send existing subscription state
resendSubscriptions();

// Now send off and clear pending buffer
flushReconnectPendingItems();
// Now send off and clear pending buffer
flushReconnectPendingItems();

// Flush the buffer
try {
// Flush the buffer
getOutputStream().flush();
} catch (IOException e) {
setLastError(e);
Expand Down Expand Up @@ -1740,7 +1736,7 @@ public void flush() throws IOException, InterruptedException {

// resendSubscriptions will send our subscription state back to the
// server. Used in reconnects
void resendSubscriptions() {
void resendSubscriptions() throws IOException {
long adjustedMax = 0L;
for (Map.Entry<Long, SubscriptionImpl> entry : subs.entrySet()) {
SubscriptionImpl sub = entry.getValue();
Expand Down Expand Up @@ -1789,7 +1785,7 @@ void resendSubscriptions() {
* @return the Subscription object
*/
SubscriptionImpl subscribe(String subject, String queue, MessageHandler cb,
BlockingQueue<Message> ch) {
BlockingQueue<Message> ch) throws IOException {
final SubscriptionImpl sub;
mu.lock();
try {
Expand Down Expand Up @@ -1830,7 +1826,12 @@ public void run() {

// Send SUB proto
if (!reconnecting()) {
sendSubscriptionMessage(sub);
try {
sendSubscriptionMessage(sub);
} catch(IOException e) {
removeSub(sub);
throw e;
}
}

kickFlusher();
Expand All @@ -1842,34 +1843,34 @@ public void run() {
}

@Override
public SyncSubscription subscribe(String subject) {
public SyncSubscription subscribe(String subject) throws IOException {
return subscribeSync(subject, null);
}

@Override
public SyncSubscription subscribe(String subject, String queue) {
public SyncSubscription subscribe(String subject, String queue) throws IOException {
return subscribeSync(subject, queue);
}

@Override
public AsyncSubscription subscribe(String subject, MessageHandler cb) {
public AsyncSubscription subscribe(String subject, MessageHandler cb) throws IOException {
return (AsyncSubscription) subscribe(subject, null, cb);
}

@Override
public AsyncSubscription subscribe(String subj, String queue, MessageHandler cb) {
public AsyncSubscription subscribe(String subj, String queue, MessageHandler cb) throws IOException {
return (AsyncSubscription) subscribe(subj, queue, cb, null);
}

@Override
@Deprecated
public AsyncSubscription subscribeAsync(String subject, String queue, MessageHandler cb) {
public AsyncSubscription subscribeAsync(String subject, String queue, MessageHandler cb) throws IOException {
return (AsyncSubscription) subscribe(subject, queue, cb, null);
}

@Override
@Deprecated
public AsyncSubscription subscribeAsync(String subj, MessageHandler cb) {
public AsyncSubscription subscribeAsync(String subj, MessageHandler cb) throws IOException {
return (AsyncSubscription) subscribe(subj, null, cb);
}

Expand All @@ -1879,13 +1880,13 @@ private void addSubscription(SubscriptionImpl sub) {
}

@Override
public SyncSubscription subscribeSync(String subject, String queue) {
public SyncSubscription subscribeSync(String subject, String queue) throws IOException {
return (SyncSubscription) subscribe(subject, queue, null,
createMsgChannel());
}

@Override
public SyncSubscription subscribeSync(String subject) {
public SyncSubscription subscribeSync(String subject) throws IOException {
return (SyncSubscription) subscribe(subject, null, null,
createMsgChannel());
}
Expand Down Expand Up @@ -2077,7 +2078,7 @@ public Message request(String subject, byte[] data) throws IOException, Interrup

// Creates the response subscription we will use for all new style responses. This will be on an _INBOX with an
// additional terminal token. The subscription will be on a wildcard.
private synchronized void createRespMux() {
private synchronized void createRespMux() throws IOException {
if (respMap != null) {
// Already setup for responses.
return;
Expand Down Expand Up @@ -2162,17 +2163,13 @@ public synchronized long getMaxPayload() {
}

// Assumes already have the lock
void sendSubscriptionMessage(SubscriptionImpl sub) {
void sendSubscriptionMessage(SubscriptionImpl sub) throws IOException {
// We will send these for all subs when we reconnect
// so that we can suppress here.
String queue = sub.getQueue();
String subLine = String.format(SUB_PROTO, sub.getSubject(),
(queue != null && !queue.isEmpty()) ? " " + queue : "", sub.getSid());
try {
bw.write(subLine.getBytes());
} catch (IOException e) {
// Ignore - FIXME: This should be thrown
}
bw.write(subLine.getBytes());
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/io/nats/client/ConnectionImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ public void testSelectNextServer() throws Exception {

@Test
public void testFlushReconnectPendingItems() throws Exception {
thrown.expect(IOException.class);
try (ConnectionImpl c = (ConnectionImpl) newMockedConnection()) {
byte[] pingProtoBytes = ConnectionImpl.PING_PROTO.getBytes();
int pingProtoBytesLen = pingProtoBytes.length;
Expand Down Expand Up @@ -1913,6 +1914,7 @@ public void testSendProto() throws Exception {

@Test
public void testSendSubscriptionMessage() throws Exception {
thrown.expect(IOException.class);
try (ConnectionImpl c = (ConnectionImpl) newMockedConnection()) {
SubscriptionImpl mockSub = mock(SubscriptionImpl.class);
String subject = doReturn("foo").when(mockSub).getSubject();
Expand Down

0 comments on commit 7dfbf3a

Please sign in to comment.