Skip to content

Commit

Permalink
[ADDED] Lame Duck Mode Notifications (#337)
Browse files Browse the repository at this point in the history
[Added] Lame Duck Mode Notification

Co-authored-by: Rick Hightower <rick.hightower@aboutobjects.com>
  • Loading branch information
RichardHightower and Rick Hightower committed Aug 21, 2020
1 parent d859b6d commit 031b245
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 16 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
@@ -1,7 +1,7 @@

# Change Log

## Version 2.7.0
## Version 2.8.0

- [ADDED] #323 Nats.connect v2 credentials (@olicuzo)
- [CHANGED] #320 Update MAINTAINERS.md (@gcolliso)
Expand Down
8 changes: 4 additions & 4 deletions README.md
Expand Up @@ -52,9 +52,9 @@ The java-nats client is provided in a single jar file, with a single external de

### Downloading the Jar

You can download the latest jar at [https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.7.0/jnats-2.7.0.jar](https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.7.0/jnats-2.7.0.jar).
You can download the latest jar at [https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.8.0/jnats-2.8.0.jar](https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.8.0/jnats-2.8.0.jar).

The examples are available at [https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.7.0/jnats-2.7.0-examples.jar](https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.7.0/jnats-2.7.0-examples.jar).
The examples are available at [https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.8.0/jnats-2.8.0-examples.jar](https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.8.0/jnats-2.8.0-examples.jar).

To use NKeys, you will need the ed25519 library, which can be downloaded at [https://repo1.maven.org/maven2/net/i2p/crypto/eddsa/0.3.0/eddsa-0.3.0.jar](https://repo1.maven.org/maven2/net/i2p/crypto/eddsa/0.3.0/eddsa-0.3.0.jar).

Expand All @@ -64,7 +64,7 @@ The NATS client is available in the Maven central repository, and can be importe

```groovy
dependencies {
implementation 'io.nats:jnats:2.7.0'
implementation 'io.nats:jnats:2.8.0'
}
```

Expand All @@ -90,7 +90,7 @@ The NATS client is available on the Maven central repository, and can be importe
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.7.0</version>
<version>2.8.0</version>
</dependency>
```

Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Expand Up @@ -15,10 +15,10 @@ plugins {
// Update version here, repeated check-ins not into master will have snapshot on them
// Be sure to update Nats.java with the latest version, the change log and the package-info.java
def versionMajor = 2
def versionMinor = 7
def versionMinor = 8
def versionPatch = 0
def versionModifier = ""
def jarVersion = "2.7.0"
def jarVersion = "2.8.0"
def branch = System.getenv("TRAVIS_BRANCH") != null ? System.getenv("TRAVIS_BRANCH") : "";
def tag = System.getenv("TRAVIS_TAG") != null ? System.getenv("TRAVIS_TAG") : "";
def useSigning = "master".equals(branch) || (!"".equals(tag) && tag.equals(branch)) // tag will be the branch on a tag event for travis
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/nats/client/ConnectionListener.java
Expand Up @@ -30,7 +30,9 @@ public enum Events {
/** The connection was reconnected and the server has been notified of all subscriptions. */
RESUBSCRIBED("nats: subscriptions re-established"),
/** The connection was told about new servers from, from the current server. */
DISCOVERED_SERVERS("nats: discovered servers");
DISCOVERED_SERVERS("nats: discovered servers"),
/** Server Sent a lame duck mode. */
LAME_DUCK("nats: lame duck mode");

private String event;

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/Nats.java
Expand Up @@ -72,7 +72,7 @@ public class Nats {
/**
* Current version of the library - {@value #CLIENT_VERSION}
*/
public static final String CLIENT_VERSION = "2.7.0";
public static final String CLIENT_VERSION = "2.8.0";

/**
* Current language of the library - {@value #CLIENT_LANGUAGE}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Expand Up @@ -1344,6 +1344,10 @@ void handleInfo(String infoJson) {
if (urls != null && urls.length > 0) {
processConnectionEvent(Events.DISCOVERED_SERVERS);
}

if (serverInfo.isLameDuckMode()) {
processConnectionEvent(Events.LAME_DUCK);
}
}

void queueOutgoing(NatsMessage msg) {
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/io/nats/client/impl/NatsServerInfo.java
Expand Up @@ -31,6 +31,7 @@ class NatsServerInfo {
static final String CONNECT_URLS = "connect_urls";
static final String PROTOCOL_VERSION = "proto";
static final String NONCE = "nonce";
static final String LAME_DUCK_MODE = "ldm";

private String serverId;
private String version;
Expand All @@ -44,12 +45,17 @@ class NatsServerInfo {
private String rawInfoJson;
private int protocolVersion;
private byte[] nonce;
private boolean lameDuckMode;

public NatsServerInfo(String json) {
this.rawInfoJson = json;
parseInfo(json);
}

public boolean isLameDuckMode() {
return lameDuckMode;
}

public String getServerId() {
return this.serverId;
}
Expand Down Expand Up @@ -106,6 +112,7 @@ public String getRawJson() {
private static final String grabObject = "\\{(.+?)\\}";

void parseInfo(String jsonString) {
Pattern lameDuckMode = Pattern.compile("\""+LAME_DUCK_MODE+"\":" + grabBoolean, Pattern.CASE_INSENSITIVE);
Pattern serverIdRE = Pattern.compile("\""+SERVER_ID+"\":" + grabString, Pattern.CASE_INSENSITIVE);
Pattern versionRE = Pattern.compile("\""+VERSION+"\":" + grabString, Pattern.CASE_INSENSITIVE);
Pattern goRE = Pattern.compile("\""+GO+"\":" + grabString, Pattern.CASE_INSENSITIVE);
Expand Down Expand Up @@ -168,7 +175,13 @@ void parseInfo(String jsonString) {
if (m.find()) {
this.tlsRequired = Boolean.parseBoolean(m.group(1));
}


m = lameDuckMode.matcher(jsonString);
if (m.find()) {
this.lameDuckMode = Boolean.parseBoolean(m.group(1));
}


m = portRE.matcher(jsonString);
if (m.find()) {
this.port = Integer.parseInt(m.group(1));
Expand Down
87 changes: 81 additions & 6 deletions src/test/java/io/nats/client/impl/InfoHandlerTests.java
Expand Up @@ -13,18 +13,16 @@

package io.nats.client.impl;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.nats.client.*;
import org.junit.Test;

import io.nats.client.Connection;
import io.nats.client.NatsServerProtocolMock;
import io.nats.client.Nats;
import static org.junit.Assert.*;

public class InfoHandlerTests {
@Test
Expand All @@ -44,6 +42,8 @@ public void testInitialInfo() throws IOException, InterruptedException {
}
}



@Test
public void testUnsolicitedInfo() throws IOException, InterruptedException, ExecutionException {
String customInfo = "{\"server_id\":\"myid\"}";
Expand Down Expand Up @@ -103,4 +103,79 @@ public void testUnsolicitedInfo() throws IOException, InterruptedException, Exec
}
}
}



@Test
public void testLDM() throws IOException, InterruptedException, ExecutionException, TimeoutException {
String customInfo = "{\"server_id\":\"myid\", \"ldm\":true}";
CompletableFuture<Boolean> gotPong = new CompletableFuture<>();
CompletableFuture<Boolean> sendInfo = new CompletableFuture<>();
CompletableFuture<ConnectionListener.Events> connectLDM = new CompletableFuture<>();

NatsServerProtocolMock.Customizer infoCustomizer = (ts, r, w) -> {

// Wait for client to be ready.
try {
sendInfo.get();
} catch (Exception e) {
// return, we will fail the test
gotPong.cancel(true);
return;
}

System.out.println("*** Mock Server @" + ts.getPort() + " sending INFO ...");
w.write("INFO {\"server_id\":\"replacement\"}\r\n");
w.flush();

System.out.println("*** Mock Server @" + ts.getPort() + " sending PING ...");
w.write("PING\r\n");
w.flush();

String pong = "";

System.out.println("*** Mock Server @" + ts.getPort() + " waiting for PONG ...");
try {
pong = r.readLine();
} catch (Exception e) {
gotPong.cancel(true);
return;
}

if (pong != null && pong.startsWith("PONG")) {
System.out.println("*** Mock Server @" + ts.getPort() + " got PONG ...");
gotPong.complete(Boolean.TRUE);
} else {
System.out.println("*** Mock Server @" + ts.getPort() + " got something else... " + pong);
gotPong.complete(Boolean.FALSE);
}
};

try (NatsServerProtocolMock ts = new NatsServerProtocolMock(infoCustomizer, customInfo)) {

Options options = new Options.Builder().server(ts.getURI()).connectionListener(new ConnectionListener() {
@Override
public void connectionEvent(Connection conn, Events type) {
if (type.equals(Events.LAME_DUCK)) connectLDM.complete(type);
}
}).build();

Connection nc = Nats.connect(options);
try {
assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus());
assertEquals("got custom info", "myid", ((NatsConnection) nc).getInfo().getServerId());
sendInfo.complete(Boolean.TRUE);

assertTrue("Got pong.", gotPong.get().booleanValue()); // Server round tripped so we should have new info
assertEquals("got replacement info", "replacement", ((NatsConnection) nc).getInfo().getServerId());
} finally {
nc.close();
assertTrue("Closed Status", Connection.Status.CLOSED == nc.getStatus());
}
}

ConnectionListener.Events event = connectLDM.get(5, TimeUnit.SECONDS);
assertEquals(event, ConnectionListener.Events.LAME_DUCK);
System.out.println(event);
}
}

0 comments on commit 031b245

Please sign in to comment.