Permalink
Browse files

Merge pull request #121 from jsanda/op-timeout

adding back support for setting timeout per operation
  • Loading branch information...
2 parents 10dd585 + 6f908a3 commit 264dae8f1b3a2a214012cfee89b5a7bd86b57eb9 @zznate committed Feb 27, 2013
@@ -15,12 +15,6 @@
*/
package org.usergrid.vx.experimental;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.codehaus.jackson.map.ObjectMapper;
import org.usergrid.vx.handler.http.OperationsRequestHandler;
import org.usergrid.vx.handler.http.TimeoutHandler;
@@ -32,6 +26,9 @@
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class IntraHandlerJson implements Handler<HttpServerRequest>{
static IntraService is = new IntraService();
@@ -99,32 +96,45 @@ public void handle(Message<JsonObject> event) {
});
}
- private void registerRequestHandler() {
- vertx.eventBus().registerHandler("request.json", new Handler<Message<JsonObject>>() {
- @Override
- public void handle(Message<JsonObject> event) {
- AtomicInteger idGenerator = new AtomicInteger(0);
- JsonArray operations = event.body.getArray("e");
- JsonObject operation = (JsonObject) operations.get(idGenerator.get());
- operation.putNumber("id", idGenerator.get());
- operation.putObject("state", new JsonObject()
- .putArray("components", new JsonArray()
- .add("name")
- .add("value")));
- idGenerator.incrementAndGet();
+ private void registerRequestHandler() {
+ vertx.eventBus().registerHandler("request.json", new Handler<Message<JsonObject>>() {
+ @Override
+ public void handle(Message<JsonObject> event) {
+ AtomicInteger idGenerator = new AtomicInteger(0);
+ JsonArray operations = event.body.getArray("e");
+ JsonObject operation = (JsonObject) operations.get(idGenerator.get());
+ Long timeout = getOperationTime(operation);
- OperationsRequestHandler operationsRequestHandler = new OperationsRequestHandler(idGenerator,
- operations, event, vertx);
- TimeoutHandler timeoutHandler = new TimeoutHandler(operationsRequestHandler);
- //TODO this is wrongeach operation has its own timeout, the request cant just set
- //global timeouts like this
- long timerId = vertx.setTimer(10000, timeoutHandler);
- operationsRequestHandler.setTimerId(timerId);
+ operation.putNumber("id", idGenerator.get());
+ operation.putObject("state", new JsonObject()
+ .putArray("components", new JsonArray()
+ .add("name")
+ .add("value")));
+ idGenerator.incrementAndGet();
- vertx.eventBus().send("request." + operation.getString("type").toLowerCase(), operation,
- operationsRequestHandler);
- }
- });
+ OperationsRequestHandler operationsRequestHandler = new OperationsRequestHandler(idGenerator,
+ operations, event, vertx);
+ TimeoutHandler timeoutHandler = new TimeoutHandler(operationsRequestHandler);
+ long timerId = vertx.setTimer(timeout, timeoutHandler);
+ operationsRequestHandler.setTimerId(timerId);
+
+ vertx.eventBus().send("request." + operation.getString("type").toLowerCase(), operation,
+ operationsRequestHandler);
+ }
+ });
+ }
+
+ // This method is currently duplicated in OperationsRequestHandler. We could move it to
+ // HandlerUtils but I am holding off for now because if/when we start using strongly
+ // typed objects again for the request, response, etc., this method would make sense
+ // as a property if a parameters object if a such a class is introduced.
+ private Long getOperationTime(JsonObject operation) {
+ JsonObject params = operation.getObject("op");
+ Long timeout = params.getLong("timeout");
+ if (timeout == null) {
+ timeout = 10000L;
}
+ return timeout;
+ }
}
@@ -1,136 +1,150 @@
package org.usergrid.vx.handler.http;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
-
-
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
public class OperationsRequestHandler implements Handler<Message<JsonObject>> {
- private AtomicInteger idGenerator;
+ private AtomicInteger idGenerator;
- private JsonArray operations;
+ private JsonArray operations;
- private Message<JsonObject> originalMessage;
+ private Message<JsonObject> originalMessage;
- private JsonObject results;
+ private JsonObject results;
- private JsonObject state;
+ private JsonObject state;
- private boolean timedOut = false;
+ private boolean timedOut = false;
- private ReentrantLock timeoutLock = new ReentrantLock();
+ private ReentrantLock timeoutLock = new ReentrantLock();
- private long timerId;
-
- //TODO static ?
- private Vertx vertx;
+ private long timerId;
- public OperationsRequestHandler(AtomicInteger idGenerator, JsonArray operations,
- Message<JsonObject> originalMessage, Vertx vertx) {
- this.idGenerator = idGenerator;
- this.operations = operations;
- this.originalMessage = originalMessage;
- this.vertx = vertx;
+ //TODO static ?
+ private Vertx vertx;
- results = new JsonObject();
- results.putObject("opsRes", new JsonObject());
- results.putString("exception", null);
- results.putString("exceptionId", null);
+ public OperationsRequestHandler(AtomicInteger idGenerator, JsonArray operations,
+ Message<JsonObject> originalMessage, Vertx vertx) {
+ this.idGenerator = idGenerator;
+ this.operations = operations;
+ this.originalMessage = originalMessage;
+ this.vertx = vertx;
- state = new JsonObject();
- }
+ results = new JsonObject();
+ results.putObject("opsRes", new JsonObject());
+ results.putString("exception", null);
+ results.putString("exceptionId", null);
- public void setTimerId(long timerId) {
- this.timerId = timerId;
- }
+ state = new JsonObject();
+ }
+
+ public void setTimerId(long timerId) {
+ this.timerId = timerId;
+ }
+
+ @Override
+ public void handle(Message<JsonObject> event) {
+ vertx.cancelTimer(timerId);
+
+ Integer currentId = idGenerator.get();
+ Integer opId = currentId - 1;
- @Override
- public void handle(Message<JsonObject> event) {
- vertx.cancelTimer(timerId);
-
- Integer currentId = idGenerator.get();
- Integer opId = currentId - 1;
-
- String exceptionId = event.body.getString("exceptionId");
- String exception = event.body.getString("exception");
-
- if (exception != null || exceptionId != null) {
- results.putString("exception", exception);
- results.putString("exceptionId", exceptionId);
-
- sendResults();
- }
-
- Map<String, Object> map = event.body.toMap();
- Object opResult = map.get(opId.toString());
-
- // Doing the instanceof check here sucks but there are two reasons why it is
- // here at least for now. First, with this refactoring I do not want to change
- // behavior. To the greatest extent possible, I want integration tests to pass
- // as is. Secondly, I do not want to duplicate logic across each operation
- // handler. So far the operation handler does not need to worry about the
- // format of the response that is sent back to the client. That is done here.
- // The operation handler just provides its own specific response that is keyed
- // off of its operation id.
- //
- // John Sanda
- if (opResult instanceof String) {
- results.getObject("opsRes").putString(opId.toString(), (String) opResult);
- } else if (opResult instanceof Number) {
- results.getObject("opsRes").putNumber(opId.toString(), (Number) opResult);
- } else if (opResult instanceof JsonObject) {
- results.getObject("opsRes").putObject(opId.toString(), (JsonObject) opResult);
- } else if (opResult instanceof List) {
- results.getObject("opsRes").putArray(opId.toString(), new JsonArray((List) opResult));
- } else {
- throw new IllegalArgumentException(opResult.getClass() + " is not a supported result type");
- }
-
- if (idGenerator.get() < operations.size()) {
- JsonObject operation = (JsonObject) operations.get(idGenerator.get());
- operation.putNumber("id", idGenerator.get());
-
- if (event.body.getObject("state") != null) {
- state.mergeIn(event.body.getObject("state"));
- }
- operation.putObject("state", state.copy());
- idGenerator.incrementAndGet();
- TimeoutHandler timeoutHandler = new TimeoutHandler(this);
- timerId = vertx.setTimer(10000, timeoutHandler);
- vertx.eventBus().send("request." + operation.getString("type").toLowerCase(), operation, this);
- } else {
- sendResults();
- }
+ String exceptionId = event.body.getString("exceptionId");
+ String exception = event.body.getString("exception");
+
+ if (exception != null || exceptionId != null) {
+ results.putString("exception", exception);
+ results.putString("exceptionId", exceptionId);
+
+ sendResults();
}
- private void sendResults() {
- try {
- timeoutLock.lock();
- if (!timedOut) {
- originalMessage.reply(results);
- }
- } finally {
- timeoutLock.unlock();
- }
+ Map<String, Object> map = event.body.toMap();
+ Object opResult = map.get(opId.toString());
+
+ // Doing the instanceof check here sucks but there are two reasons why it is
+ // here at least for now. First, with this refactoring I do not want to change
+ // behavior. To the greatest extent possible, I want integration tests to pass
+ // as is. Secondly, I do not want to duplicate logic across each operation
+ // handler. So far the operation handler does not need to worry about the
+ // format of the response that is sent back to the client. That is done here.
+ // The operation handler just provides its own specific response that is keyed
+ // off of its operation id.
+ //
+ // John Sanda
+ if (opResult instanceof String) {
+ results.getObject("opsRes").putString(opId.toString(), (String) opResult);
+ } else if (opResult instanceof Number) {
+ results.getObject("opsRes").putNumber(opId.toString(), (Number) opResult);
+ } else if (opResult instanceof JsonObject) {
+ results.getObject("opsRes").putObject(opId.toString(), (JsonObject) opResult);
+ } else if (opResult instanceof List) {
+ results.getObject("opsRes").putArray(opId.toString(), new JsonArray((List) opResult));
+ } else {
+ throw new IllegalArgumentException(opResult.getClass() + " is not a supported result type");
}
- public void timeout() {
- try {
- timeoutLock.lock();
- timedOut = true;
- results.putString("exception", "Operation timed out.");
- results.putString("exceptionId", Integer.toString(idGenerator.get() - 1));
- originalMessage.reply(results);
- } finally {
- timeoutLock.unlock();
- }
+ if (idGenerator.get() < operations.size()) {
+ JsonObject operation = (JsonObject) operations.get(idGenerator.get());
+ operation.putNumber("id", idGenerator.get());
+
+ Long timeout = getOperationTimeout(operation);
+
+ if (event.body.getObject("state") != null) {
+ state.mergeIn(event.body.getObject("state"));
+ }
+ operation.putObject("state", state.copy());
+ idGenerator.incrementAndGet();
+ TimeoutHandler timeoutHandler = new TimeoutHandler(this);
+ timerId = vertx.setTimer(timeout, timeoutHandler);
+ vertx.eventBus().send("request." + operation.getString("type").toLowerCase(), operation, this);
+ } else {
+ sendResults();
+ }
+ }
+
+ // This method is currently duplicated in IntraHandlerJson. We could move it to
+ // HandlerUtils but I am holding off for now because if/when we start using strongly
+ // typed objects again for the request, response, etc., this method would make sense
+ // as a property if a parameters object if a such a class is introduced.
+ private Long getOperationTimeout(JsonObject operation) {
+ JsonObject params = operation.getObject("op");
+ Long timeout = params.getLong("timeout");
+ if (timeout == null) {
+ timeout = 10000L;
+ }
+ return timeout;
+ }
+
+ private void sendResults() {
+ try {
+ timeoutLock.lock();
+ if (!timedOut) {
+ originalMessage.reply(results);
+ }
+ } finally {
+ timeoutLock.unlock();
+ }
+ }
+
+ public void timeout() {
+ try {
+ timeoutLock.lock();
+ timedOut = true;
+ results.putString("exception", "Operation timed out.");
+ results.putString("exceptionId", Integer.toString(idGenerator.get() - 1));
+ originalMessage.reply(results);
+ } finally {
+ timeoutLock.unlock();
}
+ }
}
Oops, something went wrong.

0 comments on commit 264dae8

Please sign in to comment.