Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Result mode plus other cleanups #205

Merged
merged 1 commit into from

2 participants

@edwardcapriolo
Collaborator

This is a new feature to write resutls directly to c* rather then reading them back to the client. In the mean time I fixed several other issues like consistency level was not being respected by handlers, magic strings. The usual :)

@zznate zznate merged commit 7c84acc into from
@zznate zznate deleted the branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 22, 2013
  1. @edwardcapriolo
This page is out of date. Refresh to see the latest.
View
3  src/main/java/org/usergrid/vx/experimental/IntraOp.java
@@ -84,7 +84,8 @@ public Type getType() {
EXECUTEPREPARED ,
CREATESCANFILTER,
NOOP ,
- SETKEYSPACE
+ SETKEYSPACE,
+ RESULTMODE
};
}
View
13 src/main/java/org/usergrid/vx/experimental/Operations.java
@@ -31,8 +31,8 @@
@SuppressWarnings("rawtypes")
public class Operations {
- private static final String KEYSPACE = "keyspace";
- private static final String COLUMN_FAMILY = "columnfamily";
+ public static final String KEYSPACE = "keyspace";
+ public static final String COLUMN_FAMILY = "columnfamily";
public static final String ROWKEY = "rowkey";
public static final String NAME = "name";
public static final String VALUE = "value";
@@ -50,7 +50,7 @@
private static final String PROCESSORNAME = "processorname";
private static final String PARAMS = "params";
private static final String INPUT = "input";
- private static final String ON = "on";
+ public static final String ON = "on";
private static final String QUERY = "query";
private static final String VERSION = "version";
private static final String TRANSPOSE = "transpose";
@@ -288,6 +288,13 @@ public static IntraOp serviceProcess(String name, Map params){
return new IntraOp(IntraOp.Type.SERVICEPROCESS).set(NAME, name).set(PARAMS, params);
}
+ public static IntraOp resultMode(String ks, String cf, boolean on){
+ return new IntraOp(IntraOp.Type.RESULTMODE)
+ .set(Operations.KEYSPACE, ks)
+ .set(Operations.COLUMN_FAMILY, cf)
+ .set(Operations.ON, on);
+ }
+
public static IntraOp componentSelect(Set<String> components){
return new IntraOp(IntraOp.Type.COMPONENTSELECT).set("components", components);
}
View
2  src/main/java/org/usergrid/vx/server/operations/BatchHandler.java
@@ -37,7 +37,7 @@ public void handle(Message<JsonObject> event) {
}
mutations.add(rm);
}
- HandlerUtils.write(mutations, event, id);
+ HandlerUtils.instance.write(mutations, event, id, state);
}
}
View
14 src/main/java/org/usergrid/vx/server/operations/ConsistencyHandler.java
@@ -1,21 +1,21 @@
package org.usergrid.vx.server.operations;
-import org.vertx.java.core.Handler;
+import org.usergrid.vx.experimental.Operations;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonObject;
/*
* The consistency verb changes the consistency level of the state.
*/
-public class ConsistencyHandler implements Handler<Message<JsonObject>> {
+public class ConsistencyHandler extends AbstractIntravertHandler {
@Override
- public void handle(Message<JsonObject> event) {
+ public void handleUser(Message<JsonObject> event) {
Integer id = event.body.getInteger("id");
- JsonObject params = event.body.getObject("op");
- JsonObject state = event.body.getObject("state");
- state.putString("consistency", params.getString("level") );
+ JsonObject params = event.body.getObject(Operations.OP);
+ JsonObject state = event.body.getObject(Operations.STATE);
+ HandlerUtils.instance.setConsistencyLevel(state, params.getString("level"));
event.reply(new JsonObject().putString(id.toString(), "OK").putObject(
- "state", state));
+ Operations.STATE, state));
}
}
View
2  src/main/java/org/usergrid/vx/server/operations/CounterHandler.java
@@ -30,6 +30,6 @@ public void handleUser(Message<JsonObject> event) {
Long.parseLong(params.toMap().get("value").toString()));
List<IMutation> mutations = new ArrayList<IMutation>(1);
mutations.add(new CounterMutation(rm, HandlerUtils.instance.determineConsistencyLevel(state)));
- HandlerUtils.write(mutations, event, id);
+ HandlerUtils.instance.write(mutations, event, id, state);
}
}
View
35 src/main/java/org/usergrid/vx/server/operations/HandlerUtils.java
@@ -34,6 +34,9 @@
// todo will this be reloadable?
public static HandlerUtils instance;
public static Map<String,Filter> filters = new HashMap<String,Filter>();
+
+ private static final String resultMode = "resultMode";
+ private static final String consistency = "consitency";
static {
instance = new HandlerUtils();
}
@@ -54,10 +57,34 @@ public void activateFilter(JsonObject state, String filterName){
state.putString("currentFilter", filterName);
}
+ public JsonObject getResultMode(JsonObject state){
+ return state.getObject(resultMode);
+ }
public void deactivateFilter(JsonObject state){
state.removeField("currentFilter");
}
+ public void activateResultMode(JsonObject state, String keyspace, String columnFamily) {
+ state.putObject(resultMode, new JsonObject().putString(Operations.KEYSPACE, keyspace)
+ .putString(Operations.COLUMN_FAMILY, columnFamily));
+ }
+
+ public void deactivateResultMode(JsonObject state){
+ state.removeField(resultMode);
+ }
+
+ public void setConsistencyLevel(JsonObject state, String level){
+ state.putString(consistency, level );
+ }
+
+ public ConsistencyLevel getConsistencyLevel(JsonObject state){
+ String cls = state.getString(consistency);
+ if (cls != null){
+ return ConsistencyLevel.valueOf(cls);
+ }
+ return ConsistencyLevel.ONE;
+ }
+
/*
* because handlers can not see the responses of other steps easily anymore we move this logic
* here. Essentially find all res ref objects and replace them
@@ -266,11 +293,9 @@ public void readCf(ColumnFamily columnFamily, JsonObject state, EventBus eb,
for (IColumn column : columnFamily) {
if (column.isLive()) {
- HashMap m = new HashMap();
-
+ HashMap<String,Object> m = new HashMap<>(4);
if (components.contains("name")) {
JsonObject columnMetadata = findMetaData(columnFamily, state, "column");
-
if (columnMetadata == null) {
m.put("name", ByteBufferUtil.getArray(column.name()));
} else {
@@ -312,10 +337,8 @@ public void readCf(ColumnFamily columnFamily, JsonObject state, EventBus eb,
eb.send("filters." + filter, array, filterReplyHandler);
}
- public static void write(List<IMutation> mutations, Message<JsonObject> event, Integer id) {
+ public void write(List<IMutation> mutations, Message<JsonObject> event, Integer id, JsonObject state) {
try {
- // We don't want to hard code the consistency level but letting it slide for
- // since it is also hard coded in IntraState
StorageProxy.mutate(mutations, ConsistencyLevel.ONE);
event.reply(new JsonObject().putString(id.toString(), "OK"));
View
11 src/main/java/org/usergrid/vx/server/operations/ReadHandler.java
@@ -10,11 +10,8 @@
public class ReadHandler {
private JsonObject params;
-
private JsonObject state;
-
private Message<JsonObject> event;
-
private EventBus eb;
public ReadHandler(Message<JsonObject> event, EventBus eb) {
@@ -27,15 +24,19 @@ public ReadHandler(Message<JsonObject> event, EventBus eb) {
public void handleRead(ColumnFamily cf) {
final Integer id = event.body.getInteger("id");
JsonArray array;
-
if (cf == null) {
event.reply(new JsonObject().putArray(id.toString(), new JsonArray()));
} else {
String filter = state.getString("currentFilter");
if (filter == null) {
array = HandlerUtils.instance.readCf(cf, state, params);
+ JsonObject resultMode = HandlerUtils.instance.getResultMode(state);
JsonObject response = new JsonObject();
- response.putArray(id.toString(), array);
+ if (resultMode == null){
+ response.putArray(id.toString(), array);
+ } else {
+ //send to batch handler?
+ }
event.reply(response);
} else {
HandlerUtils.instance.readCf(cf, state, eb, new Handler<Message<JsonArray>>() {
View
26 src/main/java/org/usergrid/vx/server/operations/ResultModeHandler.java
@@ -0,0 +1,26 @@
+package org.usergrid.vx.server.operations;
+
+import org.usergrid.vx.experimental.Operations;
+import org.vertx.java.core.eventbus.Message;
+import org.vertx.java.core.json.JsonObject;
+
+public class ResultModeHandler extends AbstractIntravertHandler {
+
+ /*
+ * (non-Javadoc)
+ * @see org.usergrid.vx.server.operations.AbstractIntravertHandler#handleUser(org.vertx.java.core.eventbus.Message)
+ * req.add(Operations.resultMode("rmks", "resultcf", true));
+ */
+ @Override
+ public void handleUser(Message<JsonObject> event) {
+ JsonObject params = event.body.getObject(Operations.OP);
+ JsonObject state = event.body.getObject(Operations.STATE);
+ boolean on = params.getBoolean(Operations.ON);
+ if (on) {
+ HandlerUtils.instance.activateResultMode(state, params.getString(Operations.KEYSPACE), params.getString(Operations.COLUMN_FAMILY));
+ } else {
+ HandlerUtils.instance.deactivateResultMode(state);
+ }
+ }
+
+}
View
2  src/main/java/org/usergrid/vx/server/operations/SetHandler.java
@@ -32,6 +32,6 @@ public void handleUser(Message<JsonObject> event) {
}
List<IMutation> mutations = new ArrayList<IMutation>();
mutations.add(rm);
- HandlerUtils.write(mutations, event, id);
+ HandlerUtils.instance.write(mutations, event, id, state);
}
}
View
52 src/test/java/org/usergrid/vx/experimental/ResultModeITest.java
@@ -0,0 +1,52 @@
+package org.usergrid.vx.experimental;
+
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.usergrid.vx.client.IntraClient2;
+
+@RunWith(CassandraRunner.class)
+@RequiresKeyspace(ksName = "rmks")
+@RequiresColumnFamily(ksName = "rmks", cfName = "rmcf")
+public class ResultModeITest {
+
+ @Ignore
+ @Test
+ @RequiresColumnFamily(ksName = "rmks", cfName = "rmcf")
+
+ public void filterTest() throws Exception {
+
+ IntraReq preq = new IntraReq();
+ preq.add(Operations.setKeyspaceOp("rmks"));
+ preq.add(Operations.createCfOp("resultcf"));
+ IntraClient2 ic2 = new IntraClient2("localhost", 8080);
+ ic2.sendBlocking(preq);
+
+ IntraReq req = new IntraReq();
+ req.add(Operations.setKeyspaceOp("rmks"));
+ req.add(Operations.setColumnFamilyOp("rmcf"));
+ req.add(Operations.setAutotimestampOp(true));
+ req.add(Operations.assumeOp("rmks", "rmcf", "value", "UTF8Type"));
+ req.add(Operations.setOp("rowa", "col1", "20"));
+ req.add(Operations.setOp("rowa", "col2", "22"));
+ req.add(Operations
+ .createFilterOp("over21", "groovy",
+ "{ row -> if (row['value'].toInteger() > 21) return row else return null }")); // 8
+ req.add(Operations.filterModeOp("over21", true));
+ req.add(Operations.resultMode("rmks", "resultcf", true));
+ req.add(Operations.sliceOp("rowa", "col1", "col3", 10));
+ IntraRes res = ic2.sendBlocking(req);
+
+ IntraReq r3 = new IntraReq();
+ r3.add(Operations.setKeyspaceOp("rmks"));
+ r3.add(Operations.setColumnFamilyOp("resultcf"));
+ req.add(Operations.sliceOp("rowa", "col1", "col3", 10).set(Operations.USER_OP_ID, "wombats"));
+ IntraRes res3 = ic2.sendBlocking(r3);
+ List<Map> results = (List<Map>) res3.getOpsRes().get("wombats");
+ Assert.assertEquals("22", results.get(0).get("value"));
+ Assert.assertEquals(1, results.size());
+ }
+}
Something went wrong with that request. Please try again.