Skip to content

Commit

Permalink
Merge branch 'master' into virgil-rest-handler
Browse files Browse the repository at this point in the history
  • Loading branch information
zznate committed Mar 2, 2013
2 parents e07787f + 430c51b commit 4ee1981
Show file tree
Hide file tree
Showing 29 changed files with 1,255 additions and 569 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ Intravert is more then an RPC library, query language, or transport for Cassandr
* No more dealing with byte[] or ByteBuffers. Intravert lets users work with simple familiar objects like String or Integer. See [Types and Composites](https://github.com/zznate/intravert-ug/wiki/Composites).

## Motivations

From an application standpoint, if you can't do sparse, wide rows, you break compatibility with 90% of Cassandra applications. So that rules out almost everything; if you can't provide the same data model, you're creating fragmentation, not pluggability.

Intravert was conceived of and designed by long time users of Cassandra who have written numerous real-world applications built on the existing Thrift API.

This API had it's warts for sure, but it was felt among us that the direction of the Cassandra project with regards to the introduction of CQL sidestepped some of the core reasons we chose Cassandra in the first place.
Expand Down
36 changes: 10 additions & 26 deletions src/main/java/org/usergrid/vx/client/IntraClient2.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
package org.usergrid.vx.client;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.smile.SmileFactory;
import org.slf4j.Logger;
Expand All @@ -27,6 +14,10 @@
import org.vertx.java.core.http.HttpClientRequest;
import org.vertx.java.core.http.HttpClientResponse;

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.CountDownLatch;

public class IntraClient2 {
private static Logger logger = LoggerFactory.getLogger(IntraClient.class);
private Vertx vertx;
Expand All @@ -43,12 +34,13 @@ public enum Transport { JSON, SMILE, XML }

public IntraClient2(String host,int port){
vertx = Vertx.newVertx();
httpClient = vertx.createHttpClient().setHost("localhost")
.setPort(8080).setMaxPoolSize(10).setKeepAlive(true);
httpClient = vertx.createHttpClient().setHost(host)
.setPort(port).setMaxPoolSize(10).setKeepAlive(true);
setTransport(Transport.JSON);
}

public IntraRes sendBlocking(IntraReq i) throws Exception {
final Buffer buffer = new Buffer();
final Buffer outRequest = new Buffer();
OutputStream st = new OutputStream(){
@Override
Expand Down Expand Up @@ -78,7 +70,7 @@ public void close() throws IOException {
};
mapper.writeValue( st, i);
final CountDownLatch doneSignal = new CountDownLatch(1);
final AtomicReference<IntraRes> ref = new AtomicReference<IntraRes>();

HttpClientRequest req = httpClient.request(METHOD,
endpoint, new Handler<HttpClientResponse>() {

Expand All @@ -87,15 +79,7 @@ public void handle(HttpClientResponse resp) {
resp.dataHandler(new Handler<Buffer>() {
@Override
public void handle(Buffer arg0) {
IntraRes ir = null;
try {
ir = mapper.readValue(arg0.getBytes(), IntraRes.class);
} catch (IOException e) {
//TODO how do we signal exception
//countdown on failed as well
//e.printStackTrace();
}
ref.set(ir);
buffer.appendBuffer(arg0);
}
});

Expand All @@ -112,7 +96,7 @@ protected void handle() {
req.putHeader(CONTENT_LENGTH, outRequest.length());
req.end(outRequest);
doneSignal.await();
return ref.get();
return mapper.readValue(buffer.getBytes(), IntraRes.class);
}

public Transport getTransport() {
Expand Down
109 changes: 23 additions & 86 deletions src/main/java/org/usergrid/vx/experimental/IntraHandlerJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,113 +15,50 @@
*/
package org.usergrid.vx.experimental;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.usergrid.vx.handler.http.OperationsRequestHandler;
import org.usergrid.vx.handler.http.TimeoutHandler;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;

import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;

public class IntraHandlerJson implements Handler<HttpServerRequest>{

static IntraService is = new IntraService();
static ObjectMapper mapper = new ObjectMapper();

private Vertx vertx;

public IntraHandlerJson(Vertx vertx){
public IntraHandlerJson(Vertx vertx) {
super();
this.vertx=vertx;
registerRequestHandler();
this.vertx = vertx;
}

@Override
public void handle(final HttpServerRequest request) {
request.bodyHandler( new Handler<Buffer>() {
public void handle(Buffer buffer) {
/*
* time to rip band aid
boolean asyncRequestsEnabled = Boolean.valueOf(
System.getProperty("async-requests-enabled", "false"));
if (asyncRequestsEnabled || true) {
handleRequestAsync(request, buffer);
} else {
handleRequest(request, buffer);
}
*/
handleRequestAsync(request, buffer);
}
});
request.bodyHandler(new Handler<Buffer>() {
public void handle(Buffer buffer) {
handleRequestAsync(request, buffer);
}
});
}

private void handleRequest(HttpServerRequest request, Buffer buffer) {
IntraRes res = new IntraRes();
IntraReq req = null;
try {
req = mapper.readValue(buffer.getBytes(), IntraReq.class);
} catch (IOException e) {
e.printStackTrace();
private void handleRequestAsync(final HttpServerRequest request, Buffer buffer) {
IntraReq req = null;
try {
req = mapper.readValue(buffer.getBytes(), IntraReq.class);
vertx.eventBus().send("request.json", req.toJson(), new Handler<Message<JsonObject>>() {
@Override
public void handle(Message<JsonObject> event) {
request.response.end(event.body.toString());
}
is.handleIntraReq(req,res,vertx);
String value = null;
try {
value = mapper.writeValueAsString(res);
} catch (IOException e) {
e.printStackTrace();
}
request.response.end(value);
}

private void handleRequestAsync(final HttpServerRequest request, Buffer buffer) {
IntraReq req = null;
try {
req = mapper.readValue(buffer.getBytes(), IntraReq.class);
} catch (IOException e) {
e.printStackTrace();
}
vertx.eventBus().send("request.json", req.toJson(), new Handler<Message<JsonObject>>() {
@Override
public void handle(Message<JsonObject> event) {
request.response.end(event.body.toString());
}
});
}

private void registerRequestHandler() {
vertx.eventBus().registerHandler("request.json", new Handler<Message<JsonObject>>() {
@Override
public void handle(Message<JsonObject> event) {
AtomicInteger idGenerator = new AtomicInteger(0);
JsonArray operations = event.body.getArray("e");
JsonObject operation = (JsonObject) operations.get(idGenerator.get());
operation.putNumber("id", idGenerator.get());
operation.putObject("state", new JsonObject()
.putArray("components", new JsonArray()
.add("name")
.add("value")));
idGenerator.incrementAndGet();

OperationsRequestHandler operationsRequestHandler = new OperationsRequestHandler(idGenerator,
operations, event, vertx);
TimeoutHandler timeoutHandler = new TimeoutHandler(operationsRequestHandler);
long timerId = vertx.setTimer(10000, timeoutHandler);
operationsRequestHandler.setTimerId(timerId);

vertx.eventBus().send("request." + operation.getString("type").toLowerCase(), operation,
operationsRequestHandler);
}
});
});
} catch (Exception e) {
request.response.statusCode = BAD_REQUEST.getCode();
request.response.end(ExceptionUtils.getFullStackTrace(e));
}
}

}
5 changes: 4 additions & 1 deletion src/main/java/org/usergrid/vx/experimental/IntraService.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.json.JsonArray;

public class IntraService {

Expand Down Expand Up @@ -82,7 +83,9 @@ public Boolean call() throws Exception {

// TODO remove unused method params
public static Object resolveObject(Object o, IntraReq req, IntraRes res,IntraState state, int i){
if (o instanceof Object[]){
if (o instanceof JsonArray){
return ((JsonArray) o).toArray();
} else if (o instanceof Object[]){
return o;
} else if (o instanceof Integer){
return o;
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/org/usergrid/vx/experimental/IntraState.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
*/
package org.usergrid.vx.experimental;

import org.apache.cassandra.db.ConsistencyLevel;
import org.usergrid.vx.experimental.filter.Filter;
import org.usergrid.vx.experimental.scan.ScanContext;
import org.usergrid.vx.experimental.scan.ScanFilter;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.cassandra.db.ConsistencyLevel;
import org.usergrid.vx.experimental.filter.Filter;
import org.usergrid.vx.experimental.scan.ScanContext;
import org.usergrid.vx.experimental.scan.ScanFilter;

/* class that holds properties for the request lifecycle */
public class IntraState {

Expand All @@ -47,7 +47,7 @@ public IntraState(){
Map<IntraMetaData,String> meta = new HashMap<IntraMetaData,String>();
//TODO separate per/request state from application/session state
static Map<String,Processor> processors = new HashMap<String,Processor>();
static Map<String,Filter> filters = new HashMap<String,Filter>();
public static Map<String,Filter> filters = new HashMap<String,Filter>();
static Map<String,MultiProcessor> multiProcessors = new HashMap<String,MultiProcessor>();
static Map<String,ServiceProcessor> serviceProcessors = new HashMap<String,ServiceProcessor>();
Filter currentFilter;
Expand Down Expand Up @@ -79,4 +79,8 @@ public int openScanner(ScanContext context) {
openedScanners.put(id, context);
return id;
}

public ConsistencyLevel getConsistency() {
return consistency;
}
}
4 changes: 2 additions & 2 deletions src/main/java/org/usergrid/vx/experimental/Operations.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public static IntraOp setColumnFamilyOp(String columnFamily){
.set(COLUMN_FAMILY, columnFamily);
}

public static IntraOp setAutotimestampOp(){
return new IntraOp(IntraOp.Type.AUTOTIMESTAMP);
public static IntraOp setAutotimestampOp(boolean on){
return new IntraOp(IntraOp.Type.AUTOTIMESTAMP).set("autotimestamp", on);
}

public static IntraOp setOp(Object rowkey, Object columnName, Object columnValue){
Expand Down
Loading

0 comments on commit 4ee1981

Please sign in to comment.