Skip to content

Commit

Permalink
Merge pull request #122 from zznate/morecleanup
Browse files Browse the repository at this point in the history
Some moves
  • Loading branch information
zznate committed Feb 27, 2013
2 parents c0e5bfe + d0ae77c commit fe33afd
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 95 deletions.
97 changes: 13 additions & 84 deletions src/main/java/org/usergrid/vx/experimental/IntraHandlerJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.codehaus.jackson.map.ObjectMapper;
import org.usergrid.vx.handler.http.OperationsRequestHandler;
import org.usergrid.vx.handler.http.TimeoutHandler;
import org.usergrid.vx.server.IntravertCassandraServer;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.buffer.Buffer;
Expand All @@ -31,110 +32,38 @@

public class IntraHandlerJson implements Handler<HttpServerRequest>{

static IntraService is = new IntraService();
static ObjectMapper mapper = new ObjectMapper();

private Vertx vertx;

public IntraHandlerJson(Vertx vertx){
public IntraHandlerJson(Vertx vertx) {
super();
this.vertx=vertx;
registerRequestHandler();
this.vertx = vertx;
IntravertCassandraServer.registerRequestHandler(vertx);
}

@Override
public void handle(final HttpServerRequest request) {
request.bodyHandler( new Handler<Buffer>() {
public void handle(Buffer buffer) {
/*
* time to rip band aid
boolean asyncRequestsEnabled = Boolean.valueOf(
System.getProperty("async-requests-enabled", "false"));
if (asyncRequestsEnabled || true) {
handleRequestAsync(request, buffer);
} else {
handleRequest(request, buffer);
}
*/
handleRequestAsync(request, buffer);
}
});
}

private void handleRequest(HttpServerRequest request, Buffer buffer) {
IntraRes res = new IntraRes();
IntraReq req = null;
try {
req = mapper.readValue(buffer.getBytes(), IntraReq.class);
} catch (IOException e) {
e.printStackTrace();
}
is.handleIntraReq(req,res,vertx);
String value = null;
try {
value = mapper.writeValueAsString(res);
} catch (IOException e) {
e.printStackTrace();
}
request.response.end(value);
}

private void handleRequestAsync(final HttpServerRequest request, Buffer buffer) {
IntraReq req = null;
//JsonObject j = new JsonObject(buffer);
try {
req = mapper.readValue(buffer.getBytes(), IntraReq.class);
} catch (IOException e) {
e.printStackTrace();
}
vertx.eventBus().send("request.json", req.toJson(), new Handler<Message<JsonObject>>() {
@Override
public void handle(Message<JsonObject> event) {
request.response.end(event.body.toString());
}
});
private void handleRequestAsync(final HttpServerRequest request, Buffer buffer) {
IntraReq req = null;
try {
req = mapper.readValue(buffer.getBytes(), IntraReq.class);
} catch (IOException e) {
//TODO we need to return something here.
e.printStackTrace();
}

private void registerRequestHandler() {
vertx.eventBus().registerHandler("request.json", new Handler<Message<JsonObject>>() {
vertx.eventBus().send("request.json", req.toJson(), new Handler<Message<JsonObject>>() {
@Override
public void handle(Message<JsonObject> event) {
AtomicInteger idGenerator = new AtomicInteger(0);
JsonArray operations = event.body.getArray("e");
JsonObject operation = (JsonObject) operations.get(idGenerator.get());
Long timeout = getOperationTime(operation);

operation.putNumber("id", idGenerator.get());
operation.putObject("state", new JsonObject()
.putArray("components", new JsonArray()
.add("name")
.add("value")));
idGenerator.incrementAndGet();

OperationsRequestHandler operationsRequestHandler = new OperationsRequestHandler(idGenerator,
operations, event, vertx);
TimeoutHandler timeoutHandler = new TimeoutHandler(operationsRequestHandler);
long timerId = vertx.setTimer(timeout, timeoutHandler);
operationsRequestHandler.setTimerId(timerId);

vertx.eventBus().send("request." + operation.getString("type").toLowerCase(), operation,
operationsRequestHandler);
request.response.end(event.body.toString());
}
});
}

// This method is currently duplicated in OperationsRequestHandler. We could move it to
// HandlerUtils but I am holding off for now because if/when we start using strongly
// typed objects again for the request, response, etc., this method would make sense
// as a property if a parameters object if a such a class is introduced.
private Long getOperationTime(JsonObject operation) {
JsonObject params = operation.getObject("op");
Long timeout = params.getLong("timeout");
if (timeout == null) {
timeout = 10000L;
}
return timeout;
}

}
46 changes: 35 additions & 11 deletions src/main/java/org/usergrid/vx/server/IntravertCassandraServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.usergrid.vx.server;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.cassandra.service.CassandraDaemon;
import org.slf4j.Logger;
Expand All @@ -24,20 +25,15 @@
import org.usergrid.vx.experimental.IntraHandlerJsonSmile;
import org.usergrid.vx.handler.http.HelloHandler;
import org.usergrid.vx.handler.http.NoMatchHandler;
import org.usergrid.vx.server.operations.AssumeHandler;
import org.usergrid.vx.server.operations.ConsistencyHandler;
import org.usergrid.vx.server.operations.CqlQueryHandler;
import org.usergrid.vx.server.operations.CreateColumnFamilyHandler;
import org.usergrid.vx.server.operations.CreateKeyspaceHandler;
import org.usergrid.vx.server.operations.GetHandler;
import org.usergrid.vx.server.operations.ListKeyspacesHandler;
import org.usergrid.vx.server.operations.SetColumnFamilyHandler;
import org.usergrid.vx.server.operations.SetHandler;
import org.usergrid.vx.server.operations.SetKeyspaceHandler;
import org.usergrid.vx.server.operations.SliceHandler;
import org.usergrid.vx.handler.http.OperationsRequestHandler;
import org.usergrid.vx.handler.http.TimeoutHandler;
import org.usergrid.vx.server.operations.*;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.http.RouteMatcher;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;

public class IntravertCassandraServer implements CassandraDaemon.Server {

Expand Down Expand Up @@ -76,6 +72,34 @@ public boolean isRunning() {
return running.get();
}

public static void registerRequestHandler(final Vertx x) {
x.eventBus().registerHandler("request.json", new Handler<Message<JsonObject>>() {
@Override
public void handle(Message<JsonObject> event) {
AtomicInteger idGenerator = new AtomicInteger(0);
JsonArray operations = event.body.getArray("e");
JsonObject operation = (JsonObject) operations.get(idGenerator.get());
Long timeout = HandlerUtils.getOperationTime(operation);

operation.putNumber("id", idGenerator.get());
operation.putObject("state", new JsonObject()
.putArray("components", new JsonArray()
.add("name")
.add("value")));
idGenerator.incrementAndGet();

OperationsRequestHandler operationsRequestHandler = new OperationsRequestHandler(idGenerator,
operations, event, x);
TimeoutHandler timeoutHandler = new TimeoutHandler(operationsRequestHandler);
long timerId = x.setTimer(timeout, timeoutHandler);
operationsRequestHandler.setTimerId(timerId);

x.eventBus().send("request." + operation.getString("type").toLowerCase(), operation,
operationsRequestHandler);
}
});
}

public static void registerOperationHandlers(Vertx x) {
x.eventBus().registerHandler("request.createkeyspace", new CreateKeyspaceHandler());
x.eventBus().registerHandler("request.setkeyspace", new SetKeyspaceHandler());
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/usergrid/vx/server/operations/HandlerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,14 @@ public static void write(List<IMutation> mutations, Message<JsonObject> event, I
.putString("exceptionId", id.toString()));
}
}


public static Long getOperationTime(JsonObject operation) {
JsonObject params = operation.getObject("op");
Long timeout = params.getLong("timeout");
if (timeout == null) {
timeout = 10000L;
}
return timeout;
}
}

0 comments on commit fe33afd

Please sign in to comment.