Permalink
Browse files

reconnect when client-adapter goes away and comes back

  • Loading branch information...
1 parent 8be0983 commit b2bf958a277a3c9072f31da59415a21fc4e35877 Bruce Robbins committed Sep 15, 2011
Showing with 52 additions and 6 deletions.
  1. +52 −6 s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java
@@ -20,6 +20,7 @@
import java.io.BufferedReader;
import java.io.FileReader;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.HashMap;
@@ -184,6 +185,8 @@ public static void main(String args[]) {
private int adjustedExpectedRate = 1;
private Map<Integer, EventTypeInfo> eventTypeInfoMap = new HashMap<Integer, EventTypeInfo>();
+ private Driver driver;
+ private boolean isConnected;
public int getEmitCount() {
return emitCount;
@@ -222,12 +225,8 @@ public void run() {
BufferedReader br = null;
Reader inputReader = null;
- Driver driver = null;
try {
- driver = new Driver(clientAdapterHost, clientAdapterPort);
- boolean init = driver.init();
- init &= driver.connect();
- if (!init) {
+ if (!connect()) {
System.err.println("Failed to initialize client adapter driver");
return;
}
@@ -268,7 +267,7 @@ public void run() {
}
Message message = new Message(eventTypeInfo.getStreamName(), eventTypeInfo.getClassName(), inputLine);
- driver.send(message);
+ sendMessage(message);
emitCount++;
} catch (JSONException je) {
je.printStackTrace();
@@ -311,6 +310,53 @@ public void run() {
}
}
}
+
+ public boolean connect() {
+ isConnected = false;
+ try {
+ System.out.println("Connecting...");
+ driver = new Driver(clientAdapterHost, clientAdapterPort);
+ boolean isInitialized = driver.init();
+ isConnected = isInitialized & driver.connect();
+ System.out.println("Connection made: " + isConnected);
+ return isConnected;
+ }
+ catch (IOException ioe) {
+ System.out.println("Connection made: " + isConnected);
+ return isConnected;
+ }
+ catch (NullPointerException npe) {
+ // there's a bug in the driver that causes a null pointer exception if
+ // if the target server is down
+ System.out.println("Connection made: " + isConnected);
+ return isConnected;
+ }
+ }
+
+ public boolean sendMessage(Message message) {
+ final int MAX_RETRY = 5;
+ boolean sent = false;
+ int backoff = 10;
+ for (int retries = 0; retries < MAX_RETRY; retries++) {
+ try {
+ if (!isConnected) {
+ throw new IOException("Driver not connected");
+ }
+ driver.send(message);
+ sent = true;
+ break;
+ }
+ catch (IOException ioe) {
+ try {
+ System.out.printf("Sleeping for %f seconds\n", backoff/1000.0);
+ Thread.sleep(backoff);
+ } catch (InterruptedException ie) {}
+ backoff = backoff*5;
+ connect();
+ }
+ }
+ return sent;
+ }
@SuppressWarnings("unchecked")
public void createEventTypeInfo(JSONObject classInfo) {

0 comments on commit b2bf958

Please sign in to comment.