Permalink
Browse files

java_nats_0.4.7

  • Loading branch information...
1 parent cc8d3cd commit 1a0c4c4f3f7056996155d38c130a4a58ba234ae4 @tyagihas committed Oct 22, 2012
Showing with 18 additions and 16 deletions.
  1. +0 −3 benchmark/org/nats/benchmark/PubSubPerf.java
  2. +18 −13 lib/org/nats/Connection.java
@@ -25,7 +25,6 @@ public static void main(String[] args) throws Exception {
int received = 0;
public void execute(Object o) {
received++;
- // System.out.println("Execute : " + (String)o);
if (received == loop) {
double elapsed = System.nanoTime() - start;
System.out.println();
@@ -43,8 +42,6 @@ public void execute(Object o) {
try {
for(int i = 1; i <= loop; i++) {
conn2.publish("test", val);
- // conn2.publish("test", "aaaa\r\nbbbb\r\ncccc\r\ndddd\r\n");
- // conn2.publish("test", new Integer(i).toString());
if (i % hash == 0)
System.out.print("+");
}
@@ -19,7 +19,7 @@
*/
public final class Connection {
- private static final String version = "0.4.6";
+ private static final String version = "0.4.7";
public static final int DEFAULT_PORT = 4222;
public static final String DEFAULT_URI = "nats://localhost:" + Integer.toString(DEFAULT_PORT);
@@ -82,7 +82,9 @@
private int bytes_sent;
private int msgs_received;
private int bytes_received;
-
+
+ private volatile boolean reconnecting;
+
static {
ssid = 1;
numConnections = 0;
@@ -140,6 +142,7 @@ private Connection(Properties popts, MsgHandler handler) throws IOException, Int
String[] uri = ((String)opts.get("uri")).split(":");
addr = new InetSocketAddress(uri[1].substring(2, uri[1].length()), Integer.parseInt(uri[2]));
timer = new Timer("NATS_Timer-" + numConnections);
+ reconnecting = false;
connect();
@@ -523,6 +526,7 @@ public void run() {
private void reconnect() {
boolean doReconnect = ((Boolean)opts.get("reconnect")).booleanValue();
+ processor.interrupt();
if (doReconnect) {
int max_reconnect_attempts = ((Integer)opts.get("max_reconnect_attempts")).intValue();
int reconnect_time_wait = ((Integer)opts.get("reconnect_time_wait")).intValue();
@@ -534,7 +538,8 @@ private void reconnect() {
if (isConnected()) {
sendConnectCommand();
sendSubscirptions();
- flushPending();
+ flushPending();
+ reconnecting = false;
break;
}
Thread.sleep(reconnect_time_wait);
@@ -545,14 +550,16 @@ private void reconnect() {
}
}
}
+ processor.run();
}
-
+
private class ReconnectTask extends TimerTask {
public void run() {
+ reconnecting = true;
reconnect();
}
}
-
+
// Dummy event handler
private MsgHandler emptyHandler = new MsgHandler() {};
@@ -607,10 +614,10 @@ public void run() {
} catch(AsynchronousCloseException ace) {
continue;
} catch (IOException e) {
- reconnect();
- if (!isConnected())
- // terminating background thread if reconnect fails
- break;
+ // skipping if reconnect already starts due to -ERR code
+ if (!reconnecting) reconnect();
+ // terminating background thread if reconnect fails
+ if (!isConnected()) break;
}
}
}
@@ -647,14 +654,12 @@ else if (comp(buf, PONG, 4)) {
handler = pongs.poll();
}
processEvent(null, handler);
- if (handler.caller != null)
- handler.caller.interrupt();
+ if (handler.caller != null) handler.caller.interrupt();
}
else if (comp(buf, PING, 4)) sendCommand(PONG_RESPONSE, PONG_RESPONSE_LEN, true);
else if (comp(buf, ERR, 4)) timer.schedule(new ReconnectTask(), 0);
else if (comp(buf, OK, 3)) {/* do nothing for now */}
- else if (comp(buf, INFO, 4))
- if (connectHandler != null) connectHandler.execute((Object)self);
+ else if (comp(buf, INFO, 4)) if (connectHandler != null) connectHandler.execute((Object)self);
}
break;
case AWAITING_MSG_PAYLOAD :

0 comments on commit 1a0c4c4

Please sign in to comment.