Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Estimate message sizes and introduce reconnect in RexsterClient.

  • Loading branch information...
commit c7dd37fac2e886153013e6d6bec1505cb13e02c6 1 parent d8ec297
@spmallette spmallette authored
View
45 rexster-protocol/src/main/java/com/tinkerpop/rexster/client/RexsterClient.java
@@ -12,6 +12,7 @@
import org.glassfish.grizzly.Transport;
import org.glassfish.grizzly.WriteHandler;
import org.glassfish.grizzly.nio.NIOConnection;
+import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.msgpack.MessagePack;
import org.msgpack.template.Template;
import org.msgpack.type.Value;
@@ -27,6 +28,7 @@
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.msgpack.template.Templates.TString;
@@ -45,16 +47,20 @@
private final MessagePack msgpack = new MessagePack();
private NIOConnection connection;
- private int timeout;
- private Transport transport;
+ private final int timeout;
+ private final TCPNIOTransport transport;
+ private final String host;
+ private final int port;
protected static ConcurrentHashMap<UUID, ArrayBlockingQueue<Object>> responses =
new ConcurrentHashMap<UUID, ArrayBlockingQueue<Object>>();
- protected RexsterClient(final int timeout, final NIOConnection connection, final Transport transport) {
+ protected RexsterClient(final String host, final int port, final int timeout, final NIOConnection connection, final TCPNIOTransport transport) {
this.timeout = timeout;
this.connection = connection;
this.transport = transport;
+ this.host = host;
+ this.port = port;
}
public List<Map<String,Value>> gremlin(final String script) throws Exception {
@@ -72,11 +78,22 @@ protected RexsterClient(final int timeout, final NIOConnection connection, final
final UUID requestId = msgToSend.requestAsUUID();
responses.put(requestId, responseQueue);
- try {
- this.sendRequest(msgToSend);
- } catch (Exception e) {
- responses.remove(requestId);
- throw e;
+ boolean sent = false;
+ int tries = 15;
+ while (tries > 0 && !sent) {
+ try {
+ this.sendRequest(msgToSend);
+ sent = true;
+ } catch (Exception e) {
+ tries--;
+ if (tries == 0) {
+ responses.remove(requestId);
+ throw e;
+ } else {
+ Thread.sleep(1000);
+ reopenConnectionSafe();
+ }
+ }
}
Object resultMessage;
@@ -140,11 +157,21 @@ void putResponse(final RexProMessage response) throws Exception {
}
}
+ private void reopenConnectionSafe() {
+ try {
+ final Future<Connection> future = this.transport.connect(host, port);
+ this.connection = (NIOConnection) future.get(this.timeout, TimeUnit.SECONDS);
+ this.connection.setMaxAsyncWriteQueueSize(1000000);
+ } catch (Exception e) {
+ // safe call to reopen...just swallow this
+ }
+ }
+
private void sendRequest(final RexProMessage toSend) throws Exception {
boolean sent = false;
int tries = 10;
while (tries > 0 && !sent) {
- if (toSend.estimateSize() + this.connection.getAsyncWriteQueue().spaceInBytes() <= this.connection.getMaxAsyncWriteQueueSize()) {
+ if (toSend.estimateMessageSize() + this.connection.getAsyncWriteQueue().spaceInBytes() <= this.connection.getMaxAsyncWriteQueueSize()) {
final GrizzlyFuture future = connection.write(toSend);
future.get(this.timeout, TimeUnit.SECONDS);
sent = true;
View
2  rexster-protocol/src/main/java/com/tinkerpop/rexster/client/RexsterClientFactory.java
@@ -44,7 +44,7 @@ public RexsterClient createClient(final String host, final int port, int connect
final NIOConnection connection = (NIOConnection) future.get(connectTimeout, TimeUnit.SECONDS);
connection.setMaxAsyncWriteQueueSize(1000000);
- final RexsterClient client = new RexsterClient(connectTimeout, connection, transport);
+ final RexsterClient client = new RexsterClient(host, port, connectTimeout, connection, transport);
handler.setClient(client);
return client;
}
View
16 rexster-protocol/src/main/java/com/tinkerpop/rexster/protocol/msg/ConsoleScriptResponseMessage.java
@@ -64,4 +64,20 @@
final ConsoleResultConverter converter = new ConsoleResultConverter(new StringWriter());
return converter.convert(result);
}
+
+ @Override
+ public int estimateMessageSize() {
+ return BASE_MESSAGE_SIZE + (Bindings == null ? 0 : Bindings.length) + estimateConsoleLineSize();
+ }
+
+ private int estimateConsoleLineSize() {
+ int size = 0;
+ if (ConsoleLines != null) {
+ for(String cl : ConsoleLines) {
+ size = size + cl.length();
+ }
+ }
+
+ return size;
+ }
}
View
5 rexster-protocol/src/main/java/com/tinkerpop/rexster/protocol/msg/ErrorResponseMessage.java
@@ -10,4 +10,9 @@
@Message
public class ErrorResponseMessage extends RexProMessage {
public String ErrorMessage;
+
+ @Override
+ public int estimateMessageSize() {
+ return BASE_MESSAGE_SIZE + (ErrorMessage == null ? 0 : ErrorMessage.length());
+ }
}
View
5 rexster-protocol/src/main/java/com/tinkerpop/rexster/protocol/msg/GraphSONScriptResponseMessage.java
@@ -19,4 +19,9 @@
public static byte[] convertResultToBytes(final Object result) throws Exception {
return converter.convert(result).toString().getBytes();
}
+
+ @Override
+ public int estimateMessageSize() {
+ return BASE_MESSAGE_SIZE + (Results == null ? 0 : Results.length) + (Bindings == null ? 0 : Bindings.length);
+ }
}
View
5 rexster-protocol/src/main/java/com/tinkerpop/rexster/protocol/msg/MsgPackScriptResponseMessage.java
@@ -18,4 +18,9 @@
public static byte[] convertResultToBytes(final Object result) throws Exception {
return converter.convert(result);
}
+
+ @Override
+ public int estimateMessageSize() {
+ return BASE_MESSAGE_SIZE + (Results == null ? 0 : Results.length) + (Bindings == null ? 0 : Bindings.length);
+ }
}
View
14 rexster-protocol/src/main/java/com/tinkerpop/rexster/protocol/msg/RexProMessage.java
@@ -9,7 +9,12 @@
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
-public class RexProMessage {
+public abstract class RexProMessage {
+
+ /**
+ * Constant that represents the size of a RexProMessage.
+ */
+ protected static final int BASE_MESSAGE_SIZE = 36;
/**
* The standard value for an empty session.
@@ -61,7 +66,8 @@ public void setRequestAsUUID(final UUID request) {
this.Request = BitWorks.convertUUIDToByteArray(request);
}
- public int estimateSize() {
- return 16;
- }
+ /**
+ * @return the estimated size of the message in bytes.
+ */
+ public abstract int estimateMessageSize();
}
View
7 rexster-protocol/src/main/java/com/tinkerpop/rexster/protocol/msg/ScriptRequestMessage.java
@@ -27,7 +27,10 @@ public RexsterBindings getBindings() throws IOException, ClassNotFoundException
}
@Override
- public int estimateSize() {
- return super.estimateSize() + LanguageName.length() + Script.length() + Bindings.length;
+ public int estimateMessageSize() {
+ return BASE_MESSAGE_SIZE
+ + (LanguageName == null ? 0 : LanguageName.length())
+ + (Script == null ? 0 :Script.length())
+ + (Bindings == null ? 0 : Bindings.length);
}
}
View
7 rexster-protocol/src/main/java/com/tinkerpop/rexster/protocol/msg/SessionRequestMessage.java
@@ -17,4 +17,11 @@
public byte Channel;
public String Username;
public String Password;
+
+ @Override
+ public int estimateMessageSize() {
+ return BASE_MESSAGE_SIZE + 1
+ + (Username == null ? 0 : Username.length())
+ + (Password == null ? 0 :Password.length());
+ }
}
View
16 rexster-protocol/src/main/java/com/tinkerpop/rexster/protocol/msg/SessionResponseMessage.java
@@ -11,4 +11,20 @@
@Message
public class SessionResponseMessage extends RexProMessage {
public String[] Languages;
+
+ @Override
+ public int estimateMessageSize() {
+ return BASE_MESSAGE_SIZE + estimateLanguagesSize();
+ }
+
+ private int estimateLanguagesSize() {
+ int size = 0;
+ if (Languages != null) {
+ for(String l : Languages) {
+ size = size + l.length();
+ }
+ }
+
+ return size;
+ }
}
View
BIN  rexster-server/data/graph-example-1/tinkergraph.dat
Binary file not shown
View
BIN  rexster-server/data/graph-example-2/tinkergraph.dat
Binary file not shown
Please sign in to comment.
Something went wrong with that request. Please try again.